Data Engineering/python

EP23 | 고급 Python 활용 #12 | Spark MLlib을 활용한 머신러닝 데이터 분석

ygtoken 2025. 3. 19. 23:11
728x90

이 글에서 다루는 개념

Apache Spark의 MLlib은 대규모 데이터에서 머신러닝 모델을 학습하고 예측하는 라이브러리입니다.
이번 글에서는 다음 내용을 학습합니다.

  • Spark MLlib 개념 및 아키텍처
  • PySpark를 활용한 데이터 전처리
  • 선형 회귀(Linear Regression) 모델 학습
  • 랜덤 포레스트(Random Forest) 모델 활용
  • Spark ML Pipelines을 사용한 머신러닝 워크플로우 구성

1️⃣ Spark MLlib이란?

📌 Spark MLlib이란?

  • 대용량 데이터를 병렬 분산 처리하여 머신러닝을 수행하는 Spark 라이브러리
  • 데이터프레임 기반의 ML API(pyspark.ml) 제공

📌 MLlib에서 지원하는 주요 알고리즘

모델  설명
선형 회귀 연속형 값을 예측하는 모델
로지스틱 회귀 분류(Classification) 문제 해결
랜덤 포레스트 트리 기반 분류/회귀 모델
K-Means 비지도 학습 클러스터링
PCA 차원 축소(Principal Component Analysis)

2️⃣ PySpark MLlib 환경 설정

📌 설치 (pyspark 포함됨)

pip install pyspark

 

📌 SparkSession 생성 (PySpark ML 사용을 위한 필수 설정)

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MLlibExample").getOrCreate()
print("Spark MLlib 환경 설정 완료!")

3️⃣ 데이터 불러오기 및 전처리

📌 CSV 데이터 불러오기

df = spark.read.csv("data/housing.csv", header=True, inferSchema=True)
df.show(5)

 

📌 출력 예시

+-----+-----+------+
| size|rooms|price|
+-----+-----+------+
| 1000|    2| 300000|
| 1200|    3| 350000|
| 1500|    3| 400000|
+-----+-----+------+

🔹 데이터 변환 (Feature Engineering)

📌 Spark MLlib에서는 모든 입력 변수(독립 변수)를 하나의 벡터(Vector)로 변환해야 함

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["size", "rooms"], outputCol="features")
df_transformed = assembler.transform(df)

df_transformed.select("features", "price").show(5)

 

📌 출력 예시

+-------------+------+
|     features| price|
+-------------+------+
|[1000.0, 2.0]|300000|
|[1200.0, 3.0]|350000|
|[1500.0, 3.0]|400000|
+-------------+------+

4️⃣ 선형 회귀 모델 학습 (Linear Regression)

📌 학습 데이터 분할 (Train/Test Split)

train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)

 

📌 선형 회귀 모델 학습

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train_data)

print("모델 학습 완료!")

 

📌 모델 계수 출력

print(f"회귀 계수 (Coefficients): {model.coefficients}")
print(f"절편 (Intercept): {model.intercept}")

📌 모델 예측 및 평가

predictions = model.transform(test_data)
predictions.select("features", "price", "prediction").show(5)

 

📌 평가 지표 (RMSE, R2 Score)

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"RMSE (Root Mean Squared Error): {rmse}")

5️⃣ 랜덤 포레스트 모델 학습 (Random Forest)

📌 랜덤 포레스트 회귀 모델 적용

from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features", labelCol="price", numTrees=10)
rf_model = rf.fit(train_data)

print("랜덤 포레스트 모델 학습 완료!")

 

📌 랜덤 포레스트 예측 및 평가

rf_predictions = rf_model.transform(test_data)
rf_predictions.select("features", "price", "prediction").show(5)

 

📌 RMSE 계산

rf_rmse = evaluator.evaluate(rf_predictions)
print(f"랜덤 포레스트 RMSE: {rf_rmse}")

6️⃣ Spark ML Pipelines을 활용한 머신러닝 자동화

📌 Pipeline을 사용하면 데이터 전처리 → 학습 → 예측 단계를 한 번에 처리 가능

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, lr])  # 선형 회귀 모델을 포함한 파이프라인 생성
pipeline_model = pipeline.fit(train_data)  # 전체 파이프라인 학습
predictions = pipeline_model.transform(test_data)  # 예측 실행

predictions.select("features", "price", "prediction").show(5)

📌 실전 문제: PySpark MLlib 머신러닝 모델 연습하기


문제 1: Spark DataFrame을 로드하고 데이터 확인하기

📌 CSV 데이터를 읽고 상위 5개 데이터를 출력하세요.

# 🔽 여기에 코드 작성
df = spark.read.csv("data/housing.csv", header=True, inferSchema=True)
df.show(5)

문제 2: 데이터 변환 (VectorAssembler 사용)

📌 "size"와 "rooms"를 하나의 벡터로 변환하세요.

from pyspark.ml.feature import VectorAssembler
# 🔽 여기에 코드 작성
assembler = VectorAssembler(inputCols=["size", "rooms"], outputCol="features")
df_transformed = assembler.transform(df)

df_transformed.select("features", "price").show(5)

문제 3: 선형 회귀 모델 학습 및 평가

📌 선형 회귀 모델을 학습하고 RMSE 값을 출력하세요.

from pyspark.ml.regression import LinearRegression
# 🔽 여기에 코드 작성
lr = LinearRegression(featuresCol="features", labelCol="price")
model = lr.fit(train_data)

predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

문제 4: 랜덤 포레스트 모델 학습 및 평가

📌 랜덤 포레스트 회귀 모델을 학습하고 RMSE 값을 출력하세요.

from pyspark.ml.regression import RandomForestRegressor
# 🔽 여기에 코드 작성
rf = RandomForestRegressor(featuresCol="features", labelCol="price", numTrees=10)
rf_model = rf.fit(train_data)

rf_predictions = rf_model.transform(test_data)
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"랜덤 포레스트 RMSE: {rf_rmse}")

 

728x90