728x90
이 글에서 다루는 개념
Delta Lake는 ACID 트랜잭션을 지원하는 확장 가능한 데이터 레이크 솔루션입니다.
이번 글에서는 다음 내용을 학습합니다.
- 데이터 레이크(Data Lake) 개념 및 Delta Lake의 필요성
- Spark와 Delta Lake 연동하여 데이터 저장 및 관리
- Delta Lake의 ACID 트랜잭션 및 데이터 버전 관리
- Schema Evolution을 활용한 데이터 변경 처리
- Spark SQL을 활용한 Delta 테이블 분석
1️⃣ 데이터 레이크(Data Lake)와 Delta Lake 개념
📌 데이터 레이크란?
- 원시 데이터(정형, 비정형)를 그대로 저장하여 분석할 수 있는 저장소
- 대규모 데이터 저장이 가능하지만, 데이터 무결성(ACID 보장)이 부족
📌 Delta Lake란?
- Apache Spark 기반의 트랜잭션 데이터 레이크 솔루션
- ACID 트랜잭션을 지원하여 안정적인 데이터 분석 가능
- Schema Evolution 기능을 통해 스키마 변경을 유연하게 처리
📌 Delta Lake vs 기존 데이터 레이크 비교
항목 | 기존 데이터 레이크 | Delta Lake |
ACID 트랜잭션 | ❌ 지원 안 함 | ✅ 지원 |
스키마 변경 | 제한적 | 유연한 Schema Evolution |
데이터 버전 관리 | ❌ 없음 | ✅ 지원 (Time Travel) |
성능 최적화 | 낮음 | ✅ Delta Caching 지원 |
2️⃣ Spark와 Delta Lake 연동하기
📌 Delta Lake 라이브러리 설치
pip install delta-spark
📌 SparkSession 생성 (Delta Lake 지원)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeExample") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
📌 Delta Lake 사용을 위한 Spark 설정
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
3️⃣ Delta Lake 데이터 저장 및 관리
📌 Delta 테이블로 데이터 저장
from pyspark.sql import Row
data = [
Row(id=1, name="Alice", age=25),
Row(id=2, name="Bob", age=30),
Row(id=3, name="Charlie", age=35),
]
df = spark.createDataFrame(data)
# Delta 테이블로 저장
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")
📌 Delta 테이블 읽기
df_delta = spark.read.format("delta").load("/tmp/delta-table")
df_delta.show()
📌 출력 예시
+---+-------+---+
| id| name |age|
+---+-------+---+
| 1| Alice | 25|
| 2| Bob | 30|
| 3|Charlie| 35|
+---+-------+---+
4️⃣ Delta Lake의 ACID 트랜잭션 및 데이터 버전 관리
📌 새로운 데이터 추가 (append 모드)
new_data = [
Row(id=4, name="David", age=40),
Row(id=5, name="Eve", age=45),
]
df_new = spark.createDataFrame(new_data)
df_new.write.format("delta").mode("append").save("/tmp/delta-table")
📌 Delta 테이블 변경 이력 조회
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")
delta_table.history().show()
📌 출력 예시 (데이터 변경 이력)
+-------+-------------------+----------------+
|version|timestamp |operation |
+-------+-------------------+----------------+
| 1 | 2024-03-19 12:00 | WRITE (append) |
| 0 | 2024-03-19 11:50 | WRITE (overwrite) |
+-------+-------------------+----------------+
📌 Time Travel 기능을 활용하여 특정 버전의 데이터 조회
df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df_version_0.show()
5️⃣ Schema Evolution을 활용한 데이터 변경 처리
📌 새로운 컬럼이 추가된 데이터 저장
new_schema_data = [
Row(id=6, name="Frank", age=50, city="Seoul"),
]
df_new_schema = spark.createDataFrame(new_schema_data)
df_new_schema.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/tmp/delta-table")
📌 Schema Evolution 적용 후 데이터 조회
df_updated = spark.read.format("delta").load("/tmp/delta-table")
df_updated.show()
📌 출력 예시 (새로운 컬럼 반영됨)
+---+-------+---+------+
| id| name |age| city |
+---+-------+---+------+
| 1| Alice | 25| null |
| 2| Bob | 30| null |
| 6| Frank| 50| Seoul|
+---+-------+---+------+
6️⃣ Spark SQL을 활용한 Delta 테이블 분석
📌 Delta 테이블을 Spark SQL 테이블로 등록
spark.sql("CREATE TABLE delta_table USING DELTA LOCATION '/tmp/delta-table'")
📌 SQL 쿼리 실행
df_sql = spark.sql("SELECT * FROM delta_table WHERE age > 30")
df_sql.show()
📌 출력 예시
+---+-------+---+
| id| name |age|
+---+-------+---+
| 2| Bob | 30|
| 3|Charlie| 35|
+---+-------+---+
📌 실전 문제: Delta Lake 연습하기
✅ 문제 1: Delta 테이블 생성 및 저장
📌 새로운 Delta 테이블을 생성하고 데이터를 저장하세요.
# 🔽 여기에 코드 작성
data = [
Row(id=1, name="Alice", age=25),
Row(id=2, name="Bob", age=30),
]
df = spark.createDataFrame(data)
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")
✅ 문제 2: Delta 테이블 변경 이력 조회
📌 저장된 Delta 테이블의 변경 이력을 확인하세요.
# 🔽 여기에 코드 작성
delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")
delta_table.history().show()
✅ 문제 3: 특정 버전의 Delta 테이블 조회
📌 버전 0의 데이터를 조회하세요.
# 🔽 여기에 코드 작성
df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df_version_0.show()
✅ 문제 4: Schema Evolution을 활용한 데이터 추가
📌 새로운 컬럼(city)을 포함한 데이터를 Delta 테이블에 추가하세요.
# 🔽 여기에 코드 작성
new_data = [
Row(id=3, name="Charlie", age=40, city="New York"),
]
df_new = spark.createDataFrame(new_data)
df_new.write.format("delta").mode("append").option("mergeSchema", "true").save("/tmp/delta-table")
728x90
'Data Engineering > python' 카테고리의 다른 글
EP31 | 고급 Python 활용 #20 | Spark 기반 데이터 파이프라인 배포 전략 (0) | 2025.03.19 |
---|---|
EP30 | 고급 Python 활용 #19 | Spark를 활용한 데이터 엔지니어링 Best Practices (0) | 2025.03.19 |
EP28 | 고급 Python 활용 #17 | Spark와 Hadoop을 결합한 대규모 데이터 분석 아키텍처 (0) | 2025.03.19 |
EP27 | 고급 Python 활용 #16 | Spark를 활용한 머신러닝 기반 이상 탐지 시스템 구축 (0) | 2025.03.19 |
EP26 | 고급 Python 활용 #15 | Spark를 활용한 실시간 데이터 파이프라인 구축 (0) | 2025.03.19 |