Data Engineering/python

EP29 | 고급 Python 활용 #18 | Spark와 Delta Lake를 활용한 데이터 레이크 아키텍처

ygtoken 2025. 3. 19. 23:23
728x90

이 글에서 다루는 개념

Delta Lake는 ACID 트랜잭션을 지원하는 확장 가능한 데이터 레이크 솔루션입니다.
이번 글에서는 다음 내용을 학습합니다.

  • 데이터 레이크(Data Lake) 개념 및 Delta Lake의 필요성
  • Spark와 Delta Lake 연동하여 데이터 저장 및 관리
  • Delta Lake의 ACID 트랜잭션 및 데이터 버전 관리
  • Schema Evolution을 활용한 데이터 변경 처리
  • Spark SQL을 활용한 Delta 테이블 분석

1️⃣ 데이터 레이크(Data Lake)와 Delta Lake 개념

📌 데이터 레이크란?

  • 원시 데이터(정형, 비정형)를 그대로 저장하여 분석할 수 있는 저장소
  • 대규모 데이터 저장이 가능하지만, 데이터 무결성(ACID 보장)이 부족

📌 Delta Lake란?

  • Apache Spark 기반의 트랜잭션 데이터 레이크 솔루션
  • ACID 트랜잭션을 지원하여 안정적인 데이터 분석 가능
  • Schema Evolution 기능을 통해 스키마 변경을 유연하게 처리

📌 Delta Lake vs 기존 데이터 레이크 비교

항목 기존 데이터 레이크 Delta Lake
ACID 트랜잭션 ❌ 지원 안 함 ✅ 지원
스키마 변경 제한적 유연한 Schema Evolution
데이터 버전 관리 ❌ 없음 ✅ 지원 (Time Travel)
성능 최적화 낮음 ✅ Delta Caching 지원

2️⃣ Spark와 Delta Lake 연동하기

📌 Delta Lake 라이브러리 설치

pip install delta-spark

 

📌 SparkSession 생성 (Delta Lake 지원)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

 

📌 Delta Lake 사용을 위한 Spark 설정

spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

3️⃣ Delta Lake 데이터 저장 및 관리

📌 Delta 테이블로 데이터 저장

from pyspark.sql import Row

data = [
    Row(id=1, name="Alice", age=25),
    Row(id=2, name="Bob", age=30),
    Row(id=3, name="Charlie", age=35),
]

df = spark.createDataFrame(data)

# Delta 테이블로 저장
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")

 

📌 Delta 테이블 읽기

df_delta = spark.read.format("delta").load("/tmp/delta-table")
df_delta.show()

 

📌 출력 예시

+---+-------+---+
| id|  name |age|
+---+-------+---+
|  1| Alice | 25|
|  2|   Bob | 30|
|  3|Charlie| 35|
+---+-------+---+

4️⃣ Delta Lake의 ACID 트랜잭션 및 데이터 버전 관리

📌 새로운 데이터 추가 (append 모드)

new_data = [
    Row(id=4, name="David", age=40),
    Row(id=5, name="Eve", age=45),
]

df_new = spark.createDataFrame(new_data)

df_new.write.format("delta").mode("append").save("/tmp/delta-table")

 

📌 Delta 테이블 변경 이력 조회

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")
delta_table.history().show()

 

📌 출력 예시 (데이터 변경 이력)

+-------+-------------------+----------------+
|version|timestamp         |operation       |
+-------+-------------------+----------------+
|     1 | 2024-03-19 12:00 | WRITE (append) |
|     0 | 2024-03-19 11:50 | WRITE (overwrite) |
+-------+-------------------+----------------+

 

📌 Time Travel 기능을 활용하여 특정 버전의 데이터 조회

df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df_version_0.show()

5️⃣ Schema Evolution을 활용한 데이터 변경 처리

📌 새로운 컬럼이 추가된 데이터 저장

new_schema_data = [
    Row(id=6, name="Frank", age=50, city="Seoul"),
]

df_new_schema = spark.createDataFrame(new_schema_data)

df_new_schema.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/tmp/delta-table")

 

📌 Schema Evolution 적용 후 데이터 조회

df_updated = spark.read.format("delta").load("/tmp/delta-table")
df_updated.show()

 

📌 출력 예시 (새로운 컬럼 반영됨)

+---+-------+---+------+
| id|  name |age| city |
+---+-------+---+------+
|  1| Alice | 25| null |
|  2|   Bob | 30| null |
|  6|  Frank| 50| Seoul|
+---+-------+---+------+

6️⃣ Spark SQL을 활용한 Delta 테이블 분석

📌 Delta 테이블을 Spark SQL 테이블로 등록

spark.sql("CREATE TABLE delta_table USING DELTA LOCATION '/tmp/delta-table'")

 

📌 SQL 쿼리 실행

df_sql = spark.sql("SELECT * FROM delta_table WHERE age > 30")
df_sql.show()

 

📌 출력 예시

+---+-------+---+
| id|  name |age|
+---+-------+---+
|  2|   Bob | 30|
|  3|Charlie| 35|
+---+-------+---+

📌 실전 문제: Delta Lake 연습하기


문제 1: Delta 테이블 생성 및 저장

📌 새로운 Delta 테이블을 생성하고 데이터를 저장하세요.

# 🔽 여기에 코드 작성
data = [
    Row(id=1, name="Alice", age=25),
    Row(id=2, name="Bob", age=30),
]

df = spark.createDataFrame(data)
df.write.format("delta").mode("overwrite").save("/tmp/delta-table")

문제 2: Delta 테이블 변경 이력 조회

📌 저장된 Delta 테이블의 변경 이력을 확인하세요.

# 🔽 여기에 코드 작성
delta_table = DeltaTable.forPath(spark, "/tmp/delta-table")
delta_table.history().show()

문제 3: 특정 버전의 Delta 테이블 조회

📌 버전 0의 데이터를 조회하세요.

# 🔽 여기에 코드 작성
df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df_version_0.show()

문제 4: Schema Evolution을 활용한 데이터 추가

📌 새로운 컬럼(city)을 포함한 데이터를 Delta 테이블에 추가하세요.

# 🔽 여기에 코드 작성
new_data = [
    Row(id=3, name="Charlie", age=40, city="New York"),
]

df_new = spark.createDataFrame(new_data)
df_new.write.format("delta").mode("append").option("mergeSchema", "true").save("/tmp/delta-table")

 

728x90