728x90
이 글에서 다루는 개념
Apache Spark의 MLlib은 대규모 데이터에서 머신러닝 모델을 학습하고 예측하는 라이브러리입니다.
이번 글에서는 다음 내용을 학습합니다.
- Spark MLlib 개념 및 아키텍처
- PySpark를 활용한 데이터 전처리
- 선형 회귀(Linear Regression) 모델 학습
- 랜덤 포레스트(Random Forest) 모델 활용
- Spark ML Pipelines을 사용한 머신러닝 워크플로우 구성
1️⃣ Spark MLlib이란?
📌 Spark MLlib이란?
- 대용량 데이터를 병렬 분산 처리하여 머신러닝을 수행하는 Spark 라이브러리
- 데이터프레임 기반의 ML API(pyspark.ml) 제공
📌 MLlib에서 지원하는 주요 알고리즘
모델 | 설명 |
선형 회귀 | 연속형 값을 예측하는 모델 |
로지스틱 회귀 | 분류(Classification) 문제 해결 |
랜덤 포레스트 | 트리 기반 분류/회귀 모델 |
K-Means | 비지도 학습 클러스터링 |
PCA | 차원 축소(Principal Component Analysis) |
2️⃣ PySpark MLlib 환경 설정
📌 설치 (pyspark 포함됨)
pip install pyspark
📌 SparkSession 생성 (PySpark ML 사용을 위한 필수 설정)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MLlibExample").getOrCreate()
print("Spark MLlib 환경 설정 완료!")
3️⃣ 데이터 불러오기 및 전처리
📌 CSV 데이터 불러오기
df = spark.read.csv("data/housing.csv", header=True, inferSchema=True)
df.show(5)
📌 출력 예시
+-----+-----+------+
| size|rooms|price|
+-----+-----+------+
| 1000| 2| 300000|
| 1200| 3| 350000|
| 1500| 3| 400000|
+-----+-----+------+
🔹 데이터 변환 (Feature Engineering)
📌 Spark MLlib에서는 모든 입력 변수(독립 변수)를 하나의 벡터(Vector)로 변환해야 함
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["size", "rooms"], outputCol="features")
df_transformed = assembler.transform(df)
df_transformed.select("features", "price").show(5)
📌 출력 예시
+-------------+------+
| features| price|
+-------------+------+
|[1000.0, 2.0]|300000|
|[1200.0, 3.0]|350000|
|[1500.0, 3.0]|400000|
+-------------+------+
4️⃣ 선형 회귀 모델 학습 (Linear Regression)
📌 학습 데이터 분할 (Train/Test Split)
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)
📌 선형 회귀 모델 학습
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train_data)
print("모델 학습 완료!")
📌 모델 계수 출력
print(f"회귀 계수 (Coefficients): {model.coefficients}")
print(f"절편 (Intercept): {model.intercept}")
📌 모델 예측 및 평가
predictions = model.transform(test_data)
predictions.select("features", "price", "prediction").show(5)
📌 평가 지표 (RMSE, R2 Score)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE (Root Mean Squared Error): {rmse}")
5️⃣ 랜덤 포레스트 모델 학습 (Random Forest)
📌 랜덤 포레스트 회귀 모델 적용
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="price", numTrees=10)
rf_model = rf.fit(train_data)
print("랜덤 포레스트 모델 학습 완료!")
📌 랜덤 포레스트 예측 및 평가
rf_predictions = rf_model.transform(test_data)
rf_predictions.select("features", "price", "prediction").show(5)
📌 RMSE 계산
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"랜덤 포레스트 RMSE: {rf_rmse}")
6️⃣ Spark ML Pipelines을 활용한 머신러닝 자동화
📌 Pipeline을 사용하면 데이터 전처리 → 학습 → 예측 단계를 한 번에 처리 가능
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, lr]) # 선형 회귀 모델을 포함한 파이프라인 생성
pipeline_model = pipeline.fit(train_data) # 전체 파이프라인 학습
predictions = pipeline_model.transform(test_data) # 예측 실행
predictions.select("features", "price", "prediction").show(5)
📌 실전 문제: PySpark MLlib 머신러닝 모델 연습하기
✅ 문제 1: Spark DataFrame을 로드하고 데이터 확인하기
📌 CSV 데이터를 읽고 상위 5개 데이터를 출력하세요.
# 🔽 여기에 코드 작성
df = spark.read.csv("data/housing.csv", header=True, inferSchema=True)
df.show(5)
✅ 문제 2: 데이터 변환 (VectorAssembler 사용)
📌 "size"와 "rooms"를 하나의 벡터로 변환하세요.
from pyspark.ml.feature import VectorAssembler
# 🔽 여기에 코드 작성
assembler = VectorAssembler(inputCols=["size", "rooms"], outputCol="features")
df_transformed = assembler.transform(df)
df_transformed.select("features", "price").show(5)
✅ 문제 3: 선형 회귀 모델 학습 및 평가
📌 선형 회귀 모델을 학습하고 RMSE 값을 출력하세요.
from pyspark.ml.regression import LinearRegression
# 🔽 여기에 코드 작성
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train_data)
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
✅ 문제 4: 랜덤 포레스트 모델 학습 및 평가
📌 랜덤 포레스트 회귀 모델을 학습하고 RMSE 값을 출력하세요.
from pyspark.ml.regression import RandomForestRegressor
# 🔽 여기에 코드 작성
rf = RandomForestRegressor(featuresCol="features", labelCol="price", numTrees=10)
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"랜덤 포레스트 RMSE: {rf_rmse}")
728x90
'Data Engineering > python' 카테고리의 다른 글
EP25 | 고급 Python 활용 #14 | Spark를 활용한 실시간 추천 시스템 구축 (0) | 2025.03.19 |
---|---|
EP24 | 고급 Python 활용 #13 | Spark GraphX를 활용한 그래프 데이터 분석 (0) | 2025.03.19 |
EP22 | 고급 Python 활용 #11 | Spark Streaming을 활용한 실시간 데이터 처리 (0) | 2025.03.19 |
EP21 | 고급 Python 활용 #10 | Spark와 Python을 활용한 대용량 데이터 처리 (0) | 2025.03.19 |
EP20 | 고급 Python 활용 #9 | 데이터 자동화 및 작업 스케줄링 (Airflow) (0) | 2025.03.19 |