Data Engineering/python

EP22 | 고급 Python 활용 #11 | Spark Streaming을 활용한 실시간 데이터 처리

ygtoken 2025. 3. 19. 23:09
728x90

이 글에서 다루는 개념

Apache Spark Streaming은 실시간 데이터 스트리밍을 처리하는 프레임워크입니다.
이번 글에서는 다음 내용을 학습합니다.

  • Spark Streaming 개념 및 아키텍처
  • PySpark Streaming 환경 설정
  • 소켓 데이터 스트림 처리 (socketTextStream)
  • Kafka와 Spark Streaming 연동
  • 실시간 데이터 집계 및 분석

1️⃣ Spark Streaming이란?

📌 Spark Streaming이란?

  • 실시간 데이터 처리 프레임워크
  • 데이터 스트림을 작은 배치(batch) 단위로 처리하여 분석 가능
  • Kafka, Flume, Kinesis, Socket 등 다양한 데이터 소스로부터 스트림 데이터 수집 가능

📌 Spark Streaming 아키텍처

개념 설명
DStream Discretized Stream, Spark에서 스트림 데이터를 처리하는 단위
Receiver 외부 소스(Kafka, Socket 등)에서 데이터를 수집
Batch Interval 데이터를 처리하는 주기 (예: 1초 단위)

2️⃣ PySpark Streaming 환경 설정

📌 Spark Streaming 실행을 위해 PySpark 설치 (pyspark 포함됨)

pip install pyspark

 

📌 Spark StreamingContext 생성

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# SparkContext 및 StreamingContext 생성 (5초 간격으로 배치)
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5)
  • "local[2]" → CPU 2개 사용
  • batchInterval=5 → 5초 단위로 데이터를 처리

3️⃣ 소켓 데이터 스트림 처리 (socketTextStream)

📌 TCP 소켓을 통해 스트림 데이터 수집

lines = ssc.socketTextStream("localhost", 9999)

# 단어 분할 및 개수 계산
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

word_counts.pprint()  # 콘솔에 출력

 

📌 스트리밍 시작 및 종료

ssc.start()  # 스트리밍 시작
ssc.awaitTermination()  # 프로그램 종료 방지

 

📌 서버 실행 후 테스트

  1. 새로운 터미널에서 간단한 서버 실행
nc -lk 9999
  1. Spark Streaming 실행 후, 터미널에서 문자 입력하면 해당 단어 개수가 실시간으로 출력됨

4️⃣ Kafka와 Spark Streaming 연동

📌 Kafka 설치 및 실행 (docker-compose 활용)

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
docker-compose up -d

 

📌 Kafka 토픽 생성

docker exec -it kafka kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

 

📌 Kafka Producer 실행 (kafka-console-producer.sh)

docker exec -it kafka kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

 

📌 Kafka Consumer 실행 (kafka-console-consumer.sh)

docker exec -it kafka kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092

 

📌 Spark Streaming에서 Kafka 데이터 수신

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext(appName="KafkaStream")
ssc = StreamingContext(sc, 5)

# Kafka 스트림 연결
kafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "localhost:9092"})

# 메시지 출력
messages = kafkaStream.map(lambda x: x[1])
messages.pprint()

ssc.start()
ssc.awaitTermination()

5️⃣ 실시간 데이터 집계 및 분석

📌 누적 단어 개수 계산 (updateStateByKey())

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

word_counts = words.map(lambda word: (word, 1))
cumulative_counts = word_counts.updateStateByKey(updateFunc)

cumulative_counts.pprint()

📌 이전 배치 데이터를 유지하면서 누적 개수 출력


📌 실전 문제: Spark Streaming 연습하기


문제 1: socketTextStream()을 사용하여 실시간 데이터 수집

📌 로컬 TCP 소켓(포트 9999)에서 데이터를 수집하고 출력하세요.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 🔽 여기에 코드 작성
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "SocketStream")
ssc = StreamingContext(sc, 5)

lines = ssc.socketTextStream("localhost", 9999)
lines.pprint()

ssc.start()
ssc.awaitTermination()

문제 2: 단어 개수 계산 (reduceByKey())

📌 실시간 데이터에서 단어 개수를 계산하고 출력하세요.

# 🔽 여기에 코드 작성
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

word_counts.pprint()

문제 3: Kafka에서 데이터 수집 및 출력

📌 Kafka test-topic에서 메시지를 읽고 출력하세요.

from pyspark.streaming.kafka import KafkaUtils
# 🔽 여기에 코드 작성
kafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "localhost:9092"})
messages = kafkaStream.map(lambda x: x[1])

messages.pprint()

문제 4: 누적 단어 개수(updateStateByKey()) 계산하기

📌 단어 개수를 누적하여 유지하면서 업데이트하세요.

# 🔽 여기에 코드 작성
def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

cumulative_counts = word_counts.updateStateByKey(updateFunc)
cumulative_counts.pprint()

 

728x90