Data Engineering/python

EP27 | 고급 Python 활용 #16 | Spark를 활용한 머신러닝 기반 이상 탐지 시스템 구축

ygtoken 2025. 3. 19. 23:20
728x90

이 글에서 다루는 개념

Apache Spark의 MLlib을 활용하여 대량의 데이터를 분석하고 이상값(Anomaly)을 탐지하는 방법을 배웁니다.
이번 글에서는 다음 내용을 학습합니다.

  • 이상 탐지(Anomaly Detection) 개념
  • PySpark MLlib을 활용한 이상 탐지 모델 구축
  • 이상 탐지를 위한 통계적 기법 및 머신러닝 기법 비교
  • Spark Streaming을 활용한 실시간 이상 탐지

1️⃣ 이상 탐지(Anomaly Detection)란?

📌 이상 탐지란?

  • 정상적인 패턴에서 벗어난 데이터를 식별하는 기술
  • 금융 사기 탐지, 네트워크 보안, 산업 장비 이상 감지, 품질 관리 등에 활용

📌 이상 탐지 기법 비교

방식 설명
통계적 기법 평균, 표준편차를 활용한 이상 탐지 (IQR, Z-score 등)
머신러닝 기반 이상 탐지 모델 학습 (Isolation Forest, One-Class SVM 등)
딥러닝 기반 Autoencoder, GAN을 활용한 이상 탐지

📌 PySpark에서는 MLlib을 활용하여 이상 탐지 모델을 구현 가능


2️⃣ PySpark MLlib 환경 설정

📌 SparkSession 생성

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AnomalyDetection").getOrCreate()
print("Spark MLlib 환경 설정 완료!")

 

📌 데이터 로드 (sensor_data.csv)

timestamp,temperature,pressure
2024-03-19 12:00:00,25.3,1012.5
2024-03-19 12:01:00,26.1,1011.8
2024-03-19 12:02:00,100.0,950.0  # 이상값
2024-03-19 12:03:00,24.8,1013.2
df = spark.read.csv("sensor_data.csv", header=True, inferSchema=True)
df.show()

 

📌 출력 예시

+-------------------+-----------+---------+
|          timestamp|temperature|pressure |
+-------------------+-----------+---------+
|2024-03-19 12:00:00|       25.3|   1012.5|
|2024-03-19 12:01:00|       26.1|   1011.8|
|2024-03-19 12:02:00|      100.0|    950.0|  # 이상값
|2024-03-19 12:03:00|       24.8|   1013.2|
+-------------------+-----------+---------+

3️⃣ 이상 탐지를 위한 통계적 기법 (Z-score, IQR)

📌 Z-score를 활용한 이상 탐지

from pyspark.sql.functions import mean, stddev

stats = df.select(mean("temperature").alias("mean"), stddev("temperature").alias("stddev")).collect()[0]

mean_temp, stddev_temp = stats["mean"], stats["stddev"]

df = df.withColumn("z_score", (df["temperature"] - mean_temp) / stddev_temp)
df.filter(df["z_score"] > 3).show()  # Z-score > 3인 데이터 출력

 

📌 출력 예시

+-------------------+-----------+---------+--------+
|          timestamp|temperature|pressure |z_score |
+-------------------+-----------+---------+--------+
|2024-03-19 12:02:00|      100.0|    950.0|   4.92 |
+-------------------+-----------+---------+--------+

 

📌 IQR을 활용한 이상 탐지

from pyspark.sql.functions import expr

q1, q3 = df.approxQuantile("temperature", [0.25, 0.75], 0.0)
iqr = q3 - q1

df.filter(expr(f"temperature < {q1 - 1.5 * iqr} OR temperature > {q3 + 1.5 * iqr}")).show()

4️⃣ Isolation Forest를 활용한 머신러닝 기반 이상 탐지

📌 Feature 변환 (VectorAssembler 활용)

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["temperature", "pressure"], outputCol="features")
df_transformed = assembler.transform(df)

 

📌 Isolation Forest 모델 학습

from pyspark.ml.classification import RandomForestClassifier

# 랜덤 포레스트를 이상 탐지 모델로 활용
model = RandomForestClassifier(featuresCol="features", labelCol="anomaly", numTrees=50)
trained_model = model.fit(df_transformed)

predictions = trained_model.transform(df_transformed)
predictions.select("features", "anomaly", "prediction").show()

 

📌 출력 예시 (이상값 예측)

+----------------+-------+----------+
|features        |anomaly|prediction|
+----------------+-------+----------+
|[25.3,1012.5]  |   0   |    0     |
|[100.0,950.0]  |   1   |    1     |  # 이상값 탐지됨
+----------------+-------+----------+

5️⃣ Spark Streaming을 활용한 실시간 이상 탐지

📌 Kafka에서 실시간 데이터 수집

from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(spark.sparkContext, 5)

kafkaStream = KafkaUtils.createDirectStream(ssc, ["sensor"], {"metadata.broker.list": "localhost:9092"})
sensor_data = kafkaStream.map(lambda x: x[1])

# 이상 탐지 로직 적용
def detect_anomalies(rdd):
    if not rdd.isEmpty():
        df = spark.createDataFrame(rdd.map(lambda x: (float(x.split(",")[1]), float(x.split(",")[2]))),
                                   ["temperature", "pressure"])
        df = assembler.transform(df)
        predictions = trained_model.transform(df)
        anomalies = predictions.filter(predictions["prediction"] == 1)
        anomalies.show()

sensor_data.foreachRDD(detect_anomalies)

ssc.start()
ssc.awaitTermination()

📌 Kafka에 센서 데이터 입력 시 실시간으로 이상 탐지


📌 실전 문제: Spark MLlib을 활용한 이상 탐지 연습하기


문제 1: Spark DataFrame을 로드하고 데이터 확인하기

📌 CSV 데이터를 읽고 상위 5개 데이터를 출력하세요.

# 🔽 여기에 코드 작성
df = spark.read.csv("sensor_data.csv", header=True, inferSchema=True)
df.show(5)

문제 2: Z-score를 활용하여 이상 탐지하기

📌 Z-score를 계산하여 이상값을 탐지하세요.

# 🔽 여기에 코드 작성
from pyspark.sql.functions import mean, stddev

stats = df.select(mean("temperature").alias("mean"), stddev("temperature").alias("stddev")).collect()[0]
mean_temp, stddev_temp = stats["mean"], stats["stddev"]

df = df.withColumn("z_score", (df["temperature"] - mean_temp) / stddev_temp)
df.filter(df["z_score"] > 3).show()

문제 3: 머신러닝을 활용한 이상 탐지

📌 Isolation Forest 모델을 사용하여 이상값을 예측하세요.

# 🔽 여기에 코드 작성
from pyspark.ml.classification import RandomForestClassifier

model = RandomForestClassifier(featuresCol="features", labelCol="anomaly", numTrees=50)
trained_model = model.fit(df_transformed)

predictions = trained_model.transform(df_transformed)
predictions.select("features", "anomaly", "prediction").show()

 

728x90