빅데이터의 시대에 따른 새로운 스토리지 솔루션
작성자 : 박민서
아파치 스파크를 통한 안정적인 데이터 레이크 구축
- 아파치 스파크를 통한 안정적인 데이터 레이크 구축
- 최적의 스토리지 솔루션의 중요성
- 데이터베이스
- 데이터 레이크
- 레이크하우스
- 아파치 스파크 및 델타 레이크로 레이크하우스 구축
- 변화하는 데이터를 수용하기 위해 진화하는 스키마
- 끝
데이터를 분석하는 데에 있어서 파이프라인의 목표는 처리된 데이터를 쿼리하고 그로부터 통찰력을 얻는 것이다. 데이터 파이프라인의 견고성과 성능을 결정하는 것이 스토리지 솔루션 이다. 스토리지 솔루션의 주요 기능과, 데이터베이스, 데이터 레이크, 레이크 하우스에 대해 알아보자!
최적의 스토리지 솔루션의 중요성
먼저 스토리지 솔루션에 필요한 속성에 대해 알아보자.
확장성 및 성능
스토리지 솔루션이라면 데이터 볼륨에 맞게 확장할 수 있어야 하며 워크로드에 필요한 읽기/쓰기 처리량 및 지연 시간을 제공할 수 있어야 한다.
트랜잭션 지원
복잡한 워크로드는 동시에 데이터를 읽고 쓰기 때문에 ACID 트랜잭션에 대한 지원이 가능해야 한다.
ACID: 데이터베이스 트랜잭션이 안전하게 수행된다는 것을 보장하기 위한 성질을 나타내는 것. *원자성(atomicity), 일관성(consistency), 독립성(isolation), 지속성(durability)
다양한 데이터 형식 지원
비정형(텍스트), 반정형(JSON), 정형(테이블) 데이터를 저장할 수 있어야 한다.
다양한 워크로드 지원
SQL/배치/스트리밍/ML 및 AI 워크로드와 같이 다양한 비즈니스 워크로드를 지원할 수 있어야 한다.
개방성
데이터를 오픈 데이터 형식으로 저장해야 하는 경우가 많으므로, 오픈 API를 사용하여 다양한 도구와 엔진에서 데이터에 액세스를 가능하게 한다.
이제 본격적으로 데이터베이스에서 데이터 레이크로 어떻게 발전했는지 데이터베이스와 데이터 레이크의 장점을 모두 제공하는 데이터 레이크하우스라는 차세대 스토리지 솔루션에 대하여 알아보자.
데이터베이스
데이터베이스는 구조화된 데이터를 SQL 쿼리를 사용하여 읽을 수 있는 테이블로 저장하도록 설계되었다. 데이터는 데이터베이스 관리 시스템이 최적화할 수 있도록 하는 엄격한 스키마를 준수해야 한다. 고도로 최적화된 쿼리 처리 엔진과 긴밀하게 연결해 강력한 트랜잭션 ACID를 보장하며 저장된 데이터를 매우 빠르게 게산한다.
데이터베이스의 SQL 워크로드는 크게 아래의 두 가지 범주로 분류할 수 있다.
온라인 트랜잭션 처리(online transaction processing, OLTP) 워크로드
한 번에 몇 개의 레코드를 읽거나 업데이트하는 간단한 쿼리로 높은 동시성, 짧은 지연 시간이 특징이다.
온라인 분석 처리(online analytical processing, OLAP)
많은 레코드에 대한 높은 처리량 스캔이 필요한 복잡한 쿼리가 특징이다.
아파치 스파크의 경우 OLAP 워크로드용으로 설계된 쿼리 엔진이다.
아파치 스파크를 사용하여 DB 읽고 쓰기
JDBC가 있는 데이터베이스(ex: PostgreSQL, MySQL)의 경우 내장 JDBC 데이터 소스를 적절한 JDBC 드라이버 jar 파일을 사용하여 데이터에 액세스 할 수 있다.
데이터베이스의 한계
최근 분석 워크로드의 두 가지 새로운 추세
데이터 크기의 증가
빅데이터가 도래하면서 기업이나 조직에서 수집하는 데이터의 양이 크게 증가했다. (과거:기가바이트 -> 현재: 테라바이트/페타바이트)
분석의 다양성 증가
데이터 수집의 증가와 함께 더 깊은 통찰력이 필요하게 되면서 머신러닝 및 딥러닝과 같은 복잡한 분석이 폭발적으로 성장하게 되었다.
이런 두 가지 추세를 수용하는 데에 부적합한 데이터베이스
데이터베이스를 확장하는 데에 드는 비용 문제
데이터베이스는 단일 시스템에서 데이터를 처리하는 데에 매우 효율적이지만 데이터 볼륨의 증가 속도가 문제였다. 처리 엔진을 위한 유일한 방법은 여러 시스템을 사용하여 데이터를 병렬로 처리하는 수평 확장인데 오픈소스 데이터베이스는 분산 처리를 수행하기 위한 확장이 가능하도록 설계된 것이 아니다. 특수한 데이터베이스를 사용하기에는 구입 및 유지 관리 비용이 굉장히 많이 든다.
비SQL 기반 분석을 잘 지원하지 않음
데이터베이스는 일반적으로 해당 DB의 SQL 처리 엔진만 읽을 수 있도록 고도로 최적화된 복잡한 형식으로 데이터를 저장한다. 그렇기 떄문에 머신러닝 및 딥러닝 시스템과 같은 다리 처리 도구가 데이터에 효율적으로 액세스할 수 없다.
이런 데이터베이스의 한계 때문에 데이터 레이크가 개발되었다.
데이터 레이크
데이터 레이크란 범용 하드웨어에서 실행되고 수평으로 쉽게 확장되는 분산 스토리지 솔루션이다.
분산 컴퓨팅 시스템에서 분산 스토리지 시스템을 분리하여 워크로드에 의한 필요에 따라 확장할 수 있다.
데이터는 모든 처리 엔진이 표준 API를 사용하여 읽고 쓸 수 있도록 오픈 형식의 파일로 저장되며, 아파치 하둡 프로젝트의 HDFS(하둡 파일 시스템)에 의해 대중화되었다.
데이터 레이크를 구축할 때 결정해야 하는 옵션들을 살펴보자.
스토리지 시스템
클러스터에서 HDFS를 실행하거나 클라우드 개체 저장소를 사용하도록 선택한다.
파일 형식
다운스트림 워크로드에 따라 데이터는 정형/반정형/비정형 형식의 파일로 저장된다.
처리 엔진
수행할 분석 작업 부하의 종류에 따라 처리 엔진이 선택된다.
배치 처리 엔진(스파크, 프레스토, 아파치 하이브), 스트림 처리 엔진(스파크, 아파치 플링크) 또는 머신러닝 라이브러리(스파크 MLlib, 사이킷런scikit-learn,R)
아파치 스파크를 사용하여 데이터 레이크에서 읽고 쓰기
다양한 워크로드 지원
배치 처리, ETL 작업, 스파크 SQL을 사용한 SQL 작업, 정형화 스트리밍을 사용한 스트림 처리, MLlib를 사용한 머신러닝을 포함하여 다양한 워크로드를 처리하는 데 필요한 모든 도구를 제공한다.
다양한 파일 형식 지원
비정형, 반정형, 정형 파일 형식을 지원한다.
다양한 파일 시스템 지원
빅데이터 생태계에서 사실상 표준이라고 할 수 있는 하둡의 파일 시스템 API를 지원하는 모든 스토리지 시스템에 대해 데이터를 접근할 수 있다. 대부분의 클라우드 및 온프레미스 스토리지 시스템이 하둡의 파일 시스템 API를 지원하므로 대부분의 스토리지 시스템에서 읽고 쓰는 것이 가능하다.
그러나 많은 파일 시스템의 경우 안전한 방식으로 파일 시스템에 액세스할 수 있도록 스파크를 구성해야 하므로 클라우드 스토리지 시스템은 표준 파일 시스템에서 기대하는 것과 파일 작업의 특성이 동일하지 않아서 처리 결과가 일관되지 않는 경우가 종종 있다.
데이터 레이크의 한계
트랜잭션에 대한 보증이 부족하다는 큰 문제점이 있다.
원자성과 독립성
작업이 실패하면 이미 작성된 파일을 롤백하는 메커니즘이 없으므로 잠재적으로 손상된 데이터가 남을 수 있다.
일관성
실패한 쓰기에 대한 원자성 부족으로 인해 데이터에 대한 일관성을 잃게 된다.
데이터 레이크의 이러한 제약사항을 해결하기 위해 개발자는 아래와 같은 트릭을 사용한다.
-
열 값에 따라 하위 디렉터리로 ‘파티션’
일부 레코드를 업데이트 하거나 삭제하기 위해서 종종 하위 파티션 디렉터리 전체 데이터를 다시 쓴다. -
데이터에 대한 동시 액세스 또는 이로 인한 데이터 불일치를 회피하기 위해 데이터 업데이트 작업 및 데이터 쿼리 작업 일정에 시차를 두고 실행한다.
레이크하우스
레이크하우스 lakehouse 는 OLAP 워크로드를 위한 새로운 패러다임이다. 데이터 레이크에 사용되는 확장 가능한 저비용 스토리지에서 직접 데이터베이스와 유사한 데이터 관리 기능을 제공하는 새로운 시스템 설계로 가능하다.
트랜잭션 지원
동시 작업 부하가 있는 경우 ACID 보장을 제공한다.
스키마 적용 및 거버넌스
잘못된 스키마가 있는 데이터가 테이블에 삽입되는 것을 방지하고, 필요할 때 스키마를 명시적으로 발전시켜 지속해서 변화하는 데이터를 수용 가능하다. 데이터 무결성에 대해 추론 가능해야 하며, 강력한 거버넌스 및 감사 메커니즘이 있다.
오픈 형식의 다양한 데이터 유형 지원
레이크하우스는 구조화, 반구조화 또는 비구조화를 포함하여 데이터 애플리케이션에 필요한 모든 유형의 데이터를 다룰 수 있다. 다양한 도구가 데이터에 직접 효율적으로 액세스 할 수 있도록 하려면 데이터를 쓰고 읽을 수 있도록 표준화된 API를 사용하여 데이터를 오픈 형식으로 저장해야 한다.
다양한 워크로드 지원
다양한 워크로드가 단일 저장소의 데이터에 대해 작동할 수 있도록 하여 고립된 데이터인 사일로(다양한 데이터 범주에 대한 여러 저장소)를 분해하면 개발자가 다양하고 복잡한 데이터 솔루션을 보다 쉽게 구축할 수 있다.
업서트 및 삭제 지원
변경 데이터 캡처 change-data-capture, CDC 및 저속 변경 디멘션 slowly changing dimension, SCD 작업과 같은 복잡한 사용 사례에서는 테이블의 데이터를 지속적으로 업데이트해야 하는데, 레이크하우스는 트랜잭션 보장으로 데이터를 동시에 삭제하고 업데이트 할 수 있다.
데이터 거버넌스
레이크하우스는 데이터 무결성에 대해 추론하고, 정책 준수를 위해 모든 데이터 변경사항을 감사할 수 있는 도구를 제공한다.
레이크하우스를 구축하는 데 사용할 수 있는 오픈 소스 시스템에는 아파치 후디 Apache Hudi, 아파치 아이스버그 Apache Iceberg 및 델타 레이크 Delta Lake 가 있다.
이들은 아래와 같은 작업을 할 수 있다.
- 확장 가능한 파일 시스템에 정형 파일 형식으로 대용량 데이터를 저장
- 트랜잭션 로그를 유지하여 데이터에 대한 원자적 변경 타임라인을 기록
- 로그를 사용하여 테이블 데이터의 비전을 정의하고 읽기와 쓰기 간의 스냅샷 격리 보장을 제공
- 아파치 스파크를 사용하여 테이블에 대한 읽기 및 쓰기 제공
각 프로젝트는 이 외에도 API, 성능 및 아파치 스파크의 데이터 소스 API와의 통합 수준 측면에서 고유한 특성을 가진다.
아파치 후디
우버 엔지니어링에서 처음 구축한 것으로, 키/값 스타일 데이터에 대한 증분값의 업서트 upsert 및 삭제를 위해 설계된 데이터 저장 형식이다. 데이터는 열 기반 형식(ex. 파케이 파일)과 행 기반 형식의 조합으로 저장된다.
아파치 후디가 지원하는 추가 기능
- 빠르고 플러그 가능한 인덱싱으로 업서트
- 롤백 지원을 통한 데이터 원자성 게시
- 테이블에 대한 증분 변경 읽기
- 데이터 복구를 위한 저장점
- 통계를 이용한 파일 크기 및 레이아웃 관리
- 행 및 열 데이터의 비동기 압축
아파치 아이스버그
거대한 데이터 세트를 위한 또 다른 오픈 스토리지 형식으로 단일 테이블에서 페타바이트까지 증가할 수 있는 확장성과 스키마의 진화 특성을 가지는 범용 데이터 스토리지에 더 중점을 두었다.
아파치 아이스버그가 지원하는 추가 기능
- 열 필드 및 중첩 구조의 추가/삭제/업데이트/이름 변경/재정렬을 통항 스키마 진화 기능
- 테이블의 행에 대한 파티션 값을 내부적으로 생성하는 숨겨진 파티셔닝 기능
- 데이터 볼륨 또는 쿼리 패턴이 변경될 때 테이블 레이아웃을 업데이트 하기 위해 메타데이터 작업을 자동으로 수행하는 파티션 진화 기능
- ID 또는 타임스탬프별로 특정 테이블 스냅샷을 쿼리할 수 있는 시간 탐색 time travel 기능
- 오류를 수정하기 위한 이전 버전으로의 롤백 기능
- 여러 동시 쓰기 작업 간에도 직렬화 가능한 격리 serializable islation 기능
델타 레이크
아파치 스파크의 창시자가 구축한 리눅스 파운데이션에서 호스팅하는 오픈 소스 프로젝트이다. 트랜잭션 보증을 제공하고 스키마 시행 및 발전을 가능하게 하는 오픈 데이터 저장 형식이다.
델타 레이크가 지원하는 추가 기능
- 정형화 스트리밍 소스 및 싱크를 사용하여 테이블에서 스트리밍 읽기 및 쓰기
- 자바, 스칼라 및 파이썬 API에서도 업데이트, 삭제 및 병합(업서트용) 작업
-테이블 스키마를 명시적으로 변경하거나 데이터 프레임의 쓰기 중에도 데이터 프레임의 스키마를 테이블의 스키마에 암시적으로 병합하여 스키마를 변경 가능 - ID 또는 타임스탬프별로 특정 테이블 스냅샷을 쿼리할 수 있는 시간 탐색 기능
- 오류를 수정하기 위한 이전 버전의로의 롤백 기능
- SQL, 배치 처리 또는 스트리밍 작업을 수행하는 여러 동시 작성기 간의 직렬화 가능한 격리
델타 레이크는 아파치 스파크 데이터 원본(배치 및 스트리밍 워크로드 모두) 및 SQL 작업(ex. MERGE)과 가장 긴밀하게 통합되어 있다!
아파치 스파크 및 델타 레이크로 레이크하우스 구축
델타 레이크와 아파치 스파크 구성
아래 방법 중 하나로 델타 레이크 라이브러리에 연결하도록 아파치 스파크를 구성할 수 있다.
대화형 셸 설정
아파치 스파크 3.0을 사용하는 경우, 아래 인자로 델타 레이크에서 파이스파크 또는 스칼라 셸을 시작할 수 있다.
--packages io.delta:delta-core_2.12:0.7.0
// 파이스파크
pyspark --packages io.delta:delta-core_2.12:0.7.0
스파크 2.4를 실행하는 경우에는 델타 레이크 0.6.0을 사용해야 한다.
메이븐 좌표를 사용하여 독립형 스칼라/자바 프로젝트 설정
메이븐 중앙 저장소의 델타 레이크 바이너리를 사용하여 프로젝트를 빌드하려는 경우, 프로젝트 종속성에 메이븐 좌표를 추가하면 된다.
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>0.7.0</version>
</dependency>
마찬가지로 스파크 2.4를 실행하는 경우 델타 레이크 0.6.0을 사용해야 한다.
델타 레이크 테이블에 데이터 로드
정형 데이터 형식을 사용하여 데이터 레이크를 구축하는 방법과 유사하게 델타 레이크 형식으로 마이그레이션 하여 사용하면 된다.
format(“parquet”) 대신 format(“delta”) 를 사용하도록 모든 데이터 프레임 읽기 및 쓰기 작업을 변경하기만 하면 된다!!
// 스칼라 예제
// 소스 데이터 경로 설정
val sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
// 델타 레이크 경로 설정
val deltaPath = "/tmp/loans_delta"
// 델타 테이블 생성
spark
.read
.format("parquet")
.load(sourcePath)
.write
.format("delta")
.save(de;taPath)
// loans_delta라고 하는 데이터의 뷰를 생성
spark
.read
.format("delta")
.load(deltaPath)
.createOrReplaceTempView("loans_delta")
# 파이썬 예제
# 소스 데이터 경로 설정
sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
# 델타 레이크 경로 설정
deltaPath = "/tmp/loans_delta"
# 델타 테이블 생성
(spark.read.format("parquet").load(sourcePath)
.write.format("delta").save(deltaPath))
# loans_delta라고 하는 데이터의 뷰를 생성
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")
생성한 델타 테이블과 뷰를 이용하여 데이터를 읽고 탐색할 수 있다.
// 스칼라 및 파이썬 예제
// 로우 카운트
spark.sql("SELECT count(*) FROM loans_delta").show()
// 테이블의 첫 5행
spark.sql("SELECT * FROM loans_delta LIMIT 5").show()
델타 레이크 테이블에 데이터 스트림 로드
정적 데이터 프레임처럼 형식을 “delta”로 설정하여 델타 레이크 테이블에 쓰고 읽도록 기존 정형화 스트리밍 작업을 수정할 수 있다.
// 스칼라 예제
import org.apache.spark.sql.streaming._
val newLoanStreamDF = ... // 새 데이터를 사용하는 스트리밍 데이터 프레임
val checkpointDir = ... // 스트리밍 체크포인트를 위한 디렉터리
val steramingQuery = newLoanStreamDF.writeStream
.format("delta")
.option("checkpointLoaction", checkpointDir)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start(deltaPath)
# 파이썬 예제
newLoanStreamDF = ... # 새 데이터를 사용하는 스트리밍 데이터 프레임
checkpointDir = ... # 스트리밍 체크포인트를 위한 디렉터리
steramingQuery = (newLoanStreamDF.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir)
.trigger(processingTime = "10 seconds")
.start(deltaPath))
델타 형식은 정형화 스트리밍이 종단 간 일회 처리를 보장하며 아래와 같은 추가적 이점도 제공한다.
배치 처리 및 스트리밍 작업 모두에서 동일한 테이블에 쓰기 가능
다른 형식을 사용할 경우, 스트리밍 쓰기에 대해서 일회 처리 보장을 위해 테이블에 관리되는 메타데이터가 다른 비스트리밍 쓰기를 고려하지 않는다. 이로 인해 정형화 스트리밍 작업에서 테이블에 작성된 데이터가 테이블의 기존 데이터를 덮어쓰게 된다.
그러나 델타 레이크의 메타데이터 관리는 매치 및 스트리밍 데이터를 모두 작성할 수 있게 한다.
여러 스트리밍 작업에서 동일한 테이블에 데이터 추가 가능
다른 형식의 메타데이터는 여러 정형화 스트리밍 쿼리가 동일한 테이블에 추가되는 것을 허용하지 않는다.
그러나 델타 레이크의 메타데이터는 각 스트리밍 쿼리에 대한 트랜잭션 정보를 유지하기 때문에 원하는 수의 스트리밍 쿼리가 일회 처리 보장되는 테이블에도 동시에 쓰기 가능하다.
동시 쓰기에서도 ACID 보장을 제공
델타 레이크는 동시 배치 및 스트리밍 작업이 ACID 보장으로 데이터를 쓸 수 있게 한다.
데이터 손상을 방지하기 위해 쓰기 시 스키마 적용
델타 레이크 형식은 스키마를 테이블 수준 메타데이터로 기록하기 떄문에 델타 레이크 테이블에 대한 모든 쓰기는 기록 중인 데이터에 테이블의 스키마와 호환되는 스키마가 있는지 여부를 확인할 수 있다.
호환되지 않는 경우에는 데이터가 기록되고 테이블에 커밋되기 전에 오류를 발생시켜서 이러한 데이터 손상을 방지한다.
기존 테이블에 없는 열에 데이터 작성을 시도해보고 어떻게 되는지 예시를 통해 살펴보자.
// 스칼라 예제
val loanUpdates = Seq(
(1111111L, 1000, 1000.0, "TX", false),
(2222222L, 2000, 0.0, "CA", true))
.toDF("loan_id", "funded_amnt", "paid_amnt", "addr_state", "closed")
loanUpdates.write.format("delta").mode("append").save(deltaPath)
# 파이썬 예제
from pyspark.sql.functions import *
cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
items = [
(1111111L, 1000, 1000.0, "TX", false),
(2222222L, 2000, 0.0, "CA", true)
]
loanUpdates = (spark.createDataFrame(items, cols)
.withColumn("funded_amnt", cols("funded_amnt").cast("int")))
loanUpdates.write.format("delta").mode("append").save(deltaPath)
이 쓰기를 시도하면 오류가 발생하면서 실패하게 된다.
그러나 mergeSchema 옵션을 사용하여 테이블의 스키마를 실제로 발전시킬 수 있는 방법도 제공한다.
변화하는 데이터를 수용하기 위해 진화하는 스키마
델타 레이크는 mergeSchema를 사용하여 테이블에 새로운 열을 추가 가능하다.
새로운 열은 “mergeSchema” 옵션을 “true”로 설정하면 명시적으로 추가할 수 있게 된다.
// 스칼라 예제
loanUpdates.write.format("delta").mode("append")
.option("mergeSchema", "true")
.save(deltaPath)
# 파이썬 예제
(loanUpdates.write.format("delta").mode("append")
.option("mergeSchema", "true")
.save(deltaPath))
이를 통해 새로운 열이 테이블 스키마에 추가되며, 기존 행을 읽을 때 새 열의 값은 NULL로 간주된다. 스파크 3.0에서는 SQL DDL 명령 ALTER TABLEㅇ르 사용하여 열을 추가하고 삭제할 수 있다.
기존 데이터 변환
델타 레이크는 복잡한 파이프 라인 구축이 가능한 DML 명령(UPDATE, DELETE, MERGE)을 지원한다. 예시와 함께 하나 하나 살펴보자.
오류 수정을 위한 데이터 업데이트
원래 업데이트를 하기 위해서는 아래의 단계를 거쳐야 한다.
- 영향을 받지 않는 모든 행을 새 테이블에 복사한다.
- 영향을 받는 모든 행을 데이터 프레임에 복사한 다음 데이터 수정을 수행한다.
- 이전에 언급한 데이터 프레임의 레코드를 새 테이블에 삽입한다.
- 이전 테이블을 제거하고 새 테이블의 이름을 이전 테이블 이름으로 쓴다.
스파크 3.0에서는 DML 명령을 지원하기 때문에 아래의 코드처럼 간단하게 업데이트를 수행할 수 있다.
// addr_state 항목 "OR" -> "WA"로 수정
// 스칼라
import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.update(
col("addr_state") == "OR",
Map("addr_State" -> lit("WA")))
# 파이썬 예제
from delta.tables import *
deltaTable = DetlaTable.forPath(spark, deltaPath)
deltaTable.update("addr_state = 'OR'", {"addr_state":"'WA'"})
사용자 관련 데이터 삭제
// 스칼라 예제
val deltaTable = DeltaTable.forPath(spark, detlaPath)
deltaTable.delete("funded_amnt >= paid_amnt")
# 파이썬 예제
deltaTable = DeltaTable.forPath(spark, deltaPath)
deltaTable.delete("funded_amnt >= paid_amnt")
merge()를 사용하여 테이블에 변경 데이터 업서트
OLAP 워크로드를 위해 OLTP 테이블에서 행 변경사항을 다른 테이블로 복제해야 하는 변경 데이터 캡처할 때 사용한다.
// 기존 테이블에 신규 테이블 업데이트하기(동일한 스키마)
// 스칼라 예제
deltaTable
.alias("t")
.merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id")
.whenMatched.updateAll()
.whenNotMatched.insertAll()
.execute()
# 파이썬 예제
(deltaTable
.alias("t")
.merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id")
.whenMatched.updateAll()
.whenNotMatched.insertAll()
.execute())
데이터 삽입 전용 병합을 사용하여 삽입하는 동안 데이터 중복 제거
델타 레이크의 병합 작업은 확장된 구문을 지원한다.
삭제 수행, 조건 구문, 선택적 수행, 스타 구문 등이 있다.
UPDATE *, INSERT * => *를 쓰면 모든 열을 대상으로 한다.
작업 내역으로 데이터 변경 감사
델타 레이크 테이블에 대한 모든 변경사항은 테이블의 트랜잭션 로그에 커밋으로 기록된다.
테이블의 작업 기록을 쿼리하려면 아래의 코드를 실행하면 된다.
# 스칼라 및 파이썬
detlaTable.history().show()
SQL을 사용하여 주요 열만 추출할 수도 있다.
// 스칼라 예제
detlaTable
.history(3)
.select("version", "timestamp", "operation", "operationParameters")
.show(false)
# 파이썬 예제
(detlaTable
.history(3)
.select("version", "timestamp", "operation", "operationParameters")
.show(false))
시간 탐색 활용 테이블의 이전 스냅샷 쿼리
DataFrameReader 옵션 “versionAsOf” 및 “timestampAsOf”를 사용하여 테이블의 이전 버전 스냅샷을 쿼리할 수 있다.
// 스칼라 예제
spark.read
.format("delta")
.option("timestampAsOf", "2020-01-01") // 테이블 생성 이후 타임스탬프
.load(deltaPath)
spark.read.format("delta")
.option("versionOfAs", "4")
.load(deltaPath)
# 파이썬 예제
(spark.read
.format("delta")
.option("timestampAsOf", "2020-01-01") # 테이블 생성 이후 타임스탬프
.load(deltaPath))
(spark.read.format("delta")
.option("versionOfAs", "4")
.load(deltaPath))
테이블의 이전 스냅샷을 쿼리하는 것은 아래와 같은 상황에서 유용하다.
-특정 테이블 버전에서 작업을 다시 실행하여 머신러닝 실험 및 보고서 재현
-감사를 위해 서로 다른 버전 간의 데이터 변경 사항 비교
-이전 스냅샷을 데이터 프리임으로 읽고 테이블을 덮어씀으로 인해 잘못된 변경사항 롤백
끝
빅데이터의 시대가 도래하며 발생한 다양한 요구사항들을 충족하지 못한 데이터베이스, 데이터베이스의 한계를 완화하기 위해 구축된 데이터 레이크. 데이터 레이크의 한계를 극복하기 위해 구축된 레이크하우스에 대해 알아보았다.