728x90
이 글에서 다루는 개념
Apache Spark를 활용한 데이터 엔지니어링에서 성능 최적화, 유지보수성 향상, 안정적인 데이터 처리를 위한 Best Practices를 다룹니다.
이번 글에서는 다음 내용을 학습합니다.
- Spark 데이터 엔지니어링 Best Practices 개요
- 데이터 읽기/쓰기 성능 최적화
- 메모리 및 실행 최적화 (Shuffle, Partitioning)
- 모니터링 및 디버깅 기법
- 데이터 파이프라인 유지보수 전략
1️⃣ Spark 데이터 엔지니어링 Best Practices 개요
📌 Spark 성능 최적화를 위한 핵심 전략
최적화 대상 | 주요 기법 |
데이터 읽기/쓰기 | CSV 대신 Parquet/ORC 포맷 사용 |
실행 계획 최적화 | cache(), persist() 사용 |
메모리 최적화 | 적절한 repartition(), coalesce() 적용 |
쿼리 성능 향상 | Catalyst Optimizer 활용 |
데이터 분산 | 균형 잡힌 Partitioning 수행 |
2️⃣ 데이터 읽기/쓰기 성능 최적화
📌 CSV 대신 Parquet/ORC 사용하기
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.write.parquet("output.parquet") # 성능 최적화
- Parquet/ORC는 컬럼 단위 저장 방식을 사용하여 읽기 성능 향상
- 압축률이 높아 저장 공간 절약
📌 압축 옵션 활용 (snappy, gzip, zstd)
df.write.parquet("output.parquet", compression="snappy")
- snappy: 빠른 압축 및 해제 (기본값)
- gzip: 높은 압축률, 하지만 느림
- zstd: 빠르고 압축률이 우수
📌 병렬 데이터 쓰기 (mode="overwrite")
df.write.mode("overwrite").parquet("output.parquet")
- overwrite → 기존 데이터 삭제 후 새 데이터 저장
- append → 기존 데이터에 추가
3️⃣ 메모리 및 실행 최적화 (Shuffle, Partitioning)
📌 적절한 Partitioning 적용 (repartition(), coalesce())
df = df.repartition(8) # 파티션 개수 조정 (높은 병렬성 제공)
df = df.coalesce(4) # 작은 데이터셋일 경우 파티션 개수 줄이기
- repartition(N): 전체 데이터 다시 분배 (Shuffle 발생)
- coalesce(N): 기존 Partition 유지, 일부만 병합 (Shuffle 최소화)
📌 Shuffle 최소화 (groupBy() → reduceByKey())
# 비효율적인 방식 (Shuffle 발생)
df.groupBy("category").count().show()
# 효율적인 방식 (Shuffle 최소화)
rdd = df.rdd.map(lambda x: (x.category, 1))
result = rdd.reduceByKey(lambda a, b: a + b)
result.collect()
📌 BroadCast Join 활용 (broadcast())
from pyspark.sql.functions import broadcast
small_df = spark.read.csv("small_lookup.csv", header=True, inferSchema=True)
large_df = spark.read.parquet("large_dataset.parquet")
# BroadCast Join 적용
result = large_df.join(broadcast(small_df), "id")
- 작은 테이블을 Broadcast하여 전체 노드에서 공유 (Shuffle 감소)
4️⃣ 모니터링 및 디버깅 기법
📌 Spark UI 활용 (4040 포트 기본 제공)
spark-submit --master local my_spark_script.py
- http://localhost:4040 접속 후 DAG, Stage, Task 모니터링
📌 쿼리 실행 계획 확인 (explain())
df.groupBy("category").count().explain(True)
- == Physical Plan == 확인하여 Unnecessary Shuffle, Broadcast Join 여부 체크
📌 Spark 로그 설정 (log4j.properties 변경)
log4j.rootCategory=INFO, console
log4j.logger.org.apache.spark=ERROR
- INFO → ERROR로 변경하여 불필요한 로그 감소
5️⃣ 데이터 파이프라인 유지보수 전략
📌 데이터 검증 및 품질 관리 (assert())
assert df.filter(df["id"].isNull()).count() == 0, "ID 컬럼에 NULL 값이 존재합니다!"
- 데이터 정합성 검증을 위해 NULL 값, 중복 값 체크
📌 Checkpoints 활용 (checkpoint())
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df.checkpoint()
- RDD 재계산 방지 및 DAG 최적화
📌 자동화된 데이터 파이프라인 (Airflow + Spark)
airflow dags trigger my_spark_pipeline
- Apache Airflow를 활용하여 Spark 작업 자동 실행
📌 실전 문제: Spark 데이터 엔지니어링 최적화 연습하기
✅ 문제 1: CSV 데이터를 Parquet으로 변환 후 저장
📌 CSV 데이터를 읽고, 압축된 Parquet 포맷으로 저장하세요.
# 🔽 여기에 코드 작성
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.write.parquet("output.parquet", compression="snappy")
✅ 문제 2: repartition()과 coalesce()의 차이점 확인
📌 8개의 파티션을 가진 DataFrame을 4개로 줄일 때 repartition()과 coalesce()의 차이를 확인하세요.
# 🔽 여기에 코드 작성
df = df.repartition(8) # 파티션 개수 8개로 설정
df_repartitioned = df.repartition(4) # 전체 재분배 (Shuffle 발생)
df_coalesced = df.coalesce(4) # 기존 Partition 유지 (Shuffle 최소화)
print(f"repartition 개수: {df_repartitioned.rdd.getNumPartitions()}")
print(f"coalesce 개수: {df_coalesced.rdd.getNumPartitions()}")
✅ 문제 3: Broadcast Join 적용하기
📌 작은 데이터셋을 Broadcast하여 Join 최적화하세요.
from pyspark.sql.functions import broadcast
# 🔽 여기에 코드 작성
small_df = spark.read.csv("small_lookup.csv", header=True, inferSchema=True)
large_df = spark.read.parquet("large_dataset.parquet")
result = large_df.join(broadcast(small_df), "id")
✅ 문제 4: Spark UI에서 실행 계획 조회 (explain())
📌 Spark SQL 실행 계획을 확인하세요.
# 🔽 여기에 코드 작성
df.groupBy("category").count().explain(True)
728x90
'Data Engineering > python' 카테고리의 다른 글
EP31 | 고급 Python 활용 #20 | Spark 기반 데이터 파이프라인 배포 전략 (0) | 2025.03.19 |
---|---|
EP29 | 고급 Python 활용 #18 | Spark와 Delta Lake를 활용한 데이터 레이크 아키텍처 (0) | 2025.03.19 |
EP28 | 고급 Python 활용 #17 | Spark와 Hadoop을 결합한 대규모 데이터 분석 아키텍처 (0) | 2025.03.19 |
EP27 | 고급 Python 활용 #16 | Spark를 활용한 머신러닝 기반 이상 탐지 시스템 구축 (0) | 2025.03.19 |
EP26 | 고급 Python 활용 #15 | Spark를 활용한 실시간 데이터 파이프라인 구축 (0) | 2025.03.19 |