Data Engineering/python

EP30 | 고급 Python 활용 #19 | Spark를 활용한 데이터 엔지니어링 Best Practices

ygtoken 2025. 3. 19. 23:24
728x90

이 글에서 다루는 개념

Apache Spark를 활용한 데이터 엔지니어링에서 성능 최적화, 유지보수성 향상, 안정적인 데이터 처리를 위한 Best Practices를 다룹니다.
이번 글에서는 다음 내용을 학습합니다.

  • Spark 데이터 엔지니어링 Best Practices 개요
  • 데이터 읽기/쓰기 성능 최적화
  • 메모리 및 실행 최적화 (Shuffle, Partitioning)
  • 모니터링 및 디버깅 기법
  • 데이터 파이프라인 유지보수 전략

1️⃣ Spark 데이터 엔지니어링 Best Practices 개요

📌 Spark 성능 최적화를 위한 핵심 전략

최적화 대상  주요 기법
데이터 읽기/쓰기 CSV 대신 Parquet/ORC 포맷 사용
실행 계획 최적화 cache(), persist() 사용
메모리 최적화 적절한 repartition(), coalesce() 적용
쿼리 성능 향상 Catalyst Optimizer 활용
데이터 분산 균형 잡힌 Partitioning 수행

2️⃣ 데이터 읽기/쓰기 성능 최적화

📌 CSV 대신 Parquet/ORC 사용하기

df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.write.parquet("output.parquet")  # 성능 최적화
  • Parquet/ORC는 컬럼 단위 저장 방식을 사용하여 읽기 성능 향상
  • 압축률이 높아 저장 공간 절약

📌 압축 옵션 활용 (snappy, gzip, zstd)

df.write.parquet("output.parquet", compression="snappy")
  • snappy: 빠른 압축 및 해제 (기본값)
  • gzip: 높은 압축률, 하지만 느림
  • zstd: 빠르고 압축률이 우수

📌 병렬 데이터 쓰기 (mode="overwrite")

df.write.mode("overwrite").parquet("output.parquet")
  • overwrite → 기존 데이터 삭제 후 새 데이터 저장
  • append → 기존 데이터에 추가

3️⃣ 메모리 및 실행 최적화 (Shuffle, Partitioning)

📌 적절한 Partitioning 적용 (repartition(), coalesce())

df = df.repartition(8)  # 파티션 개수 조정 (높은 병렬성 제공)
df = df.coalesce(4)  # 작은 데이터셋일 경우 파티션 개수 줄이기
  • repartition(N): 전체 데이터 다시 분배 (Shuffle 발생)
  • coalesce(N): 기존 Partition 유지, 일부만 병합 (Shuffle 최소화)

📌 Shuffle 최소화 (groupBy() → reduceByKey())

# 비효율적인 방식 (Shuffle 발생)
df.groupBy("category").count().show()

# 효율적인 방식 (Shuffle 최소화)
rdd = df.rdd.map(lambda x: (x.category, 1))
result = rdd.reduceByKey(lambda a, b: a + b)
result.collect()

 

📌 BroadCast Join 활용 (broadcast())

from pyspark.sql.functions import broadcast

small_df = spark.read.csv("small_lookup.csv", header=True, inferSchema=True)
large_df = spark.read.parquet("large_dataset.parquet")

# BroadCast Join 적용
result = large_df.join(broadcast(small_df), "id")
  • 작은 테이블을 Broadcast하여 전체 노드에서 공유 (Shuffle 감소)

4️⃣ 모니터링 및 디버깅 기법

📌 Spark UI 활용 (4040 포트 기본 제공)

spark-submit --master local my_spark_script.py

📌 쿼리 실행 계획 확인 (explain())

df.groupBy("category").count().explain(True)
  • == Physical Plan == 확인하여 Unnecessary Shuffle, Broadcast Join 여부 체크

📌 Spark 로그 설정 (log4j.properties 변경)

log4j.rootCategory=INFO, console
log4j.logger.org.apache.spark=ERROR
  • INFO → ERROR로 변경하여 불필요한 로그 감소

5️⃣ 데이터 파이프라인 유지보수 전략

📌 데이터 검증 및 품질 관리 (assert())

assert df.filter(df["id"].isNull()).count() == 0, "ID 컬럼에 NULL 값이 존재합니다!"
  • 데이터 정합성 검증을 위해 NULL 값, 중복 값 체크

📌 Checkpoints 활용 (checkpoint())

spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df.checkpoint()
  • RDD 재계산 방지 및 DAG 최적화

📌 자동화된 데이터 파이프라인 (Airflow + Spark)

airflow dags trigger my_spark_pipeline
  • Apache Airflow를 활용하여 Spark 작업 자동 실행

📌 실전 문제: Spark 데이터 엔지니어링 최적화 연습하기


문제 1: CSV 데이터를 Parquet으로 변환 후 저장

📌 CSV 데이터를 읽고, 압축된 Parquet 포맷으로 저장하세요.

# 🔽 여기에 코드 작성
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.write.parquet("output.parquet", compression="snappy")

문제 2: repartition()과 coalesce()의 차이점 확인

📌 8개의 파티션을 가진 DataFrame을 4개로 줄일 때 repartition()과 coalesce()의 차이를 확인하세요.

# 🔽 여기에 코드 작성
df = df.repartition(8)  # 파티션 개수 8개로 설정
df_repartitioned = df.repartition(4)  # 전체 재분배 (Shuffle 발생)
df_coalesced = df.coalesce(4)  # 기존 Partition 유지 (Shuffle 최소화)

print(f"repartition 개수: {df_repartitioned.rdd.getNumPartitions()}")
print(f"coalesce 개수: {df_coalesced.rdd.getNumPartitions()}")

문제 3: Broadcast Join 적용하기

📌 작은 데이터셋을 Broadcast하여 Join 최적화하세요.

from pyspark.sql.functions import broadcast
# 🔽 여기에 코드 작성
small_df = spark.read.csv("small_lookup.csv", header=True, inferSchema=True)
large_df = spark.read.parquet("large_dataset.parquet")

result = large_df.join(broadcast(small_df), "id")

문제 4: Spark UI에서 실행 계획 조회 (explain())

📌 Spark SQL 실행 계획을 확인하세요.

# 🔽 여기에 코드 작성
df.groupBy("category").count().explain(True)

 

 

728x90