728x90
이 글에서 다루는 개념
Apache Spark의 MLlib을 활용하여 대량의 데이터를 분석하고 이상값(Anomaly)을 탐지하는 방법을 배웁니다.
이번 글에서는 다음 내용을 학습합니다.
- 이상 탐지(Anomaly Detection) 개념
- PySpark MLlib을 활용한 이상 탐지 모델 구축
- 이상 탐지를 위한 통계적 기법 및 머신러닝 기법 비교
- Spark Streaming을 활용한 실시간 이상 탐지
1️⃣ 이상 탐지(Anomaly Detection)란?
📌 이상 탐지란?
- 정상적인 패턴에서 벗어난 데이터를 식별하는 기술
- 금융 사기 탐지, 네트워크 보안, 산업 장비 이상 감지, 품질 관리 등에 활용
📌 이상 탐지 기법 비교
방식 | 설명 |
통계적 기법 | 평균, 표준편차를 활용한 이상 탐지 (IQR, Z-score 등) |
머신러닝 기반 | 이상 탐지 모델 학습 (Isolation Forest, One-Class SVM 등) |
딥러닝 기반 | Autoencoder, GAN을 활용한 이상 탐지 |
📌 PySpark에서는 MLlib을 활용하여 이상 탐지 모델을 구현 가능
2️⃣ PySpark MLlib 환경 설정
📌 SparkSession 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AnomalyDetection").getOrCreate()
print("Spark MLlib 환경 설정 완료!")
📌 데이터 로드 (sensor_data.csv)
timestamp,temperature,pressure
2024-03-19 12:00:00,25.3,1012.5
2024-03-19 12:01:00,26.1,1011.8
2024-03-19 12:02:00,100.0,950.0 # 이상값
2024-03-19 12:03:00,24.8,1013.2
df = spark.read.csv("sensor_data.csv", header=True, inferSchema=True)
df.show()
📌 출력 예시
+-------------------+-----------+---------+
| timestamp|temperature|pressure |
+-------------------+-----------+---------+
|2024-03-19 12:00:00| 25.3| 1012.5|
|2024-03-19 12:01:00| 26.1| 1011.8|
|2024-03-19 12:02:00| 100.0| 950.0| # 이상값
|2024-03-19 12:03:00| 24.8| 1013.2|
+-------------------+-----------+---------+
3️⃣ 이상 탐지를 위한 통계적 기법 (Z-score, IQR)
📌 Z-score를 활용한 이상 탐지
from pyspark.sql.functions import mean, stddev
stats = df.select(mean("temperature").alias("mean"), stddev("temperature").alias("stddev")).collect()[0]
mean_temp, stddev_temp = stats["mean"], stats["stddev"]
df = df.withColumn("z_score", (df["temperature"] - mean_temp) / stddev_temp)
df.filter(df["z_score"] > 3).show() # Z-score > 3인 데이터 출력
📌 출력 예시
+-------------------+-----------+---------+--------+
| timestamp|temperature|pressure |z_score |
+-------------------+-----------+---------+--------+
|2024-03-19 12:02:00| 100.0| 950.0| 4.92 |
+-------------------+-----------+---------+--------+
📌 IQR을 활용한 이상 탐지
from pyspark.sql.functions import expr
q1, q3 = df.approxQuantile("temperature", [0.25, 0.75], 0.0)
iqr = q3 - q1
df.filter(expr(f"temperature < {q1 - 1.5 * iqr} OR temperature > {q3 + 1.5 * iqr}")).show()
4️⃣ Isolation Forest를 활용한 머신러닝 기반 이상 탐지
📌 Feature 변환 (VectorAssembler 활용)
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperature", "pressure"], outputCol="features")
df_transformed = assembler.transform(df)
📌 Isolation Forest 모델 학습
from pyspark.ml.classification import RandomForestClassifier
# 랜덤 포레스트를 이상 탐지 모델로 활용
model = RandomForestClassifier(featuresCol="features", labelCol="anomaly", numTrees=50)
trained_model = model.fit(df_transformed)
predictions = trained_model.transform(df_transformed)
predictions.select("features", "anomaly", "prediction").show()
📌 출력 예시 (이상값 예측)
+----------------+-------+----------+
|features |anomaly|prediction|
+----------------+-------+----------+
|[25.3,1012.5] | 0 | 0 |
|[100.0,950.0] | 1 | 1 | # 이상값 탐지됨
+----------------+-------+----------+
5️⃣ Spark Streaming을 활용한 실시간 이상 탐지
📌 Kafka에서 실시간 데이터 수집
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(spark.sparkContext, 5)
kafkaStream = KafkaUtils.createDirectStream(ssc, ["sensor"], {"metadata.broker.list": "localhost:9092"})
sensor_data = kafkaStream.map(lambda x: x[1])
# 이상 탐지 로직 적용
def detect_anomalies(rdd):
if not rdd.isEmpty():
df = spark.createDataFrame(rdd.map(lambda x: (float(x.split(",")[1]), float(x.split(",")[2]))),
["temperature", "pressure"])
df = assembler.transform(df)
predictions = trained_model.transform(df)
anomalies = predictions.filter(predictions["prediction"] == 1)
anomalies.show()
sensor_data.foreachRDD(detect_anomalies)
ssc.start()
ssc.awaitTermination()
📌 Kafka에 센서 데이터 입력 시 실시간으로 이상 탐지
📌 실전 문제: Spark MLlib을 활용한 이상 탐지 연습하기
✅ 문제 1: Spark DataFrame을 로드하고 데이터 확인하기
📌 CSV 데이터를 읽고 상위 5개 데이터를 출력하세요.
# 🔽 여기에 코드 작성
df = spark.read.csv("sensor_data.csv", header=True, inferSchema=True)
df.show(5)
✅ 문제 2: Z-score를 활용하여 이상 탐지하기
📌 Z-score를 계산하여 이상값을 탐지하세요.
# 🔽 여기에 코드 작성
from pyspark.sql.functions import mean, stddev
stats = df.select(mean("temperature").alias("mean"), stddev("temperature").alias("stddev")).collect()[0]
mean_temp, stddev_temp = stats["mean"], stats["stddev"]
df = df.withColumn("z_score", (df["temperature"] - mean_temp) / stddev_temp)
df.filter(df["z_score"] > 3).show()
✅ 문제 3: 머신러닝을 활용한 이상 탐지
📌 Isolation Forest 모델을 사용하여 이상값을 예측하세요.
# 🔽 여기에 코드 작성
from pyspark.ml.classification import RandomForestClassifier
model = RandomForestClassifier(featuresCol="features", labelCol="anomaly", numTrees=50)
trained_model = model.fit(df_transformed)
predictions = trained_model.transform(df_transformed)
predictions.select("features", "anomaly", "prediction").show()
728x90
'Data Engineering > python' 카테고리의 다른 글
EP29 | 고급 Python 활용 #18 | Spark와 Delta Lake를 활용한 데이터 레이크 아키텍처 (0) | 2025.03.19 |
---|---|
EP28 | 고급 Python 활용 #17 | Spark와 Hadoop을 결합한 대규모 데이터 분석 아키텍처 (0) | 2025.03.19 |
EP26 | 고급 Python 활용 #15 | Spark를 활용한 실시간 데이터 파이프라인 구축 (0) | 2025.03.19 |
EP25 | 고급 Python 활용 #14 | Spark를 활용한 실시간 추천 시스템 구축 (0) | 2025.03.19 |
EP24 | 고급 Python 활용 #13 | Spark GraphX를 활용한 그래프 데이터 분석 (0) | 2025.03.19 |