스파크 SQL과 데이터세트에 대해
작성자 : 박정현
스파크 SQL 과 데이터세트
4장, 5장에서는 스파크 SQL과 데이터 프레임 API에 대해 다루었다. 외부 데이터와 상호작용하는 방법, 스파크 SQL 엔진에 대해서 알아보았다. 또한 SQL과 데이터 프레임 간에 어떻게 상호 작용하는지, 뷰와 테이블은 어떻게 생성하고 관리하는지 등등..
3장에서는 데이터세트 API에 대해서 알아봤었음
이번 장은 데이터세트을 이해하기 위해 좀 더 내부를 살펴보도록 한다.
자바와 스칼라를 위한 단일 API
사실 나는 파이썬 애용자로 이 부분을 건너 뛸까 했지만, 여튼 모르면 찝찝할 것 같아서 다룬다.
이 그림들 처럼 데이터 세트는 통합되고 단일화된 API를 제공함. 스파크에서 지원하는 언어 중에서 오직 스칼라와 자바가 강력하게 형식화된 타입으로 지정됨 반대로, 파이썬, R은 형식화되지 않은 타입의 데이터 프레임 API를 지원하고 있다.
단일화된 API 덕분에 자바 개발자는 더이상 뒤처질 위험이 없다.
예를 들어, 스칼라의 groupBy()
, flatMap()
, filter()
API에 대한 향후에 제공될 인터페이스나 동작 변ㄱ경은 공통된 단일 인터페이스이기 때문에 자바에서도 동일하게 구현될 것이다.
데이터세트를 위한 스칼라 케이스 클래스와 자바빈
자바빈이 뭔지 모르겠다.
3장에서 봤지만 스파크는 StringType, BinaryType, IntegerType, BooleanType, MapType
같은 내부적 데이터 타입을 가지고 있으며, 스파크는 작업 중에 스칼라 및 자바의 언어별 데이터 타입에 원활하게 매핑하는 데 사용된다.
Dataset[T]
를 생성하기 위해서, 여기서 T는 스칼라에서 형식화된 객체이기 때문에 객체를 정의하는 case class가 필요하다.(저 T가 자바에서 제네릭 같은 느낌같다.)
case class란? 불변하고, 패턴 매칭을 통한 분해, 래퍼런스가 아닌 구조적인 동등성으로 비교, 초기화 및 운영이 간결한 특징이 있음. 자세한 내용은 Scala를 참고
아래 코드는 Dataset[Blogger]를 생성하는 코드임
case class Blogger(...)
이렇게 하면 원본에서 파일을 읽을 수 있음.
val bloggers = '../data/bloggers.json'
val bloggersDS = spark
.read
.format('json')
.option("path", bloggers)
.load()
.as[Blogger]
분산 데이터 컬렉션 결과의 각 행은 Blogger 유형으로 정의된다.
스칼라 및 자바에서 데이터세트를 만들기 위해 읽고 있는 행에 대한 모든 개별 칼럼 이름과 유형을 알아야 하기 때문에 사전에 먼저 고려해야 한다.(스키마 등등) 스파크가 스키마를 유추할 수 있도록 데이터 프레임과 달라 데이터세트 API에서는 미리 데이터 유형을 정의하고, 케이스 클래스 또는 자바빈 클래스가 스키마와 일치해야 한다.
심지어 원본 데이터 필드 순서와 일치해야 함. 단 데이터 각 행에 대한 컬럼명은 클래스의 해당 이름에 자동으로 매핑, 유형은 자동으로 보존
데이터 세트 API는 데이터프레임으로 작업하는 것만큼 쉽다고 함.
데이터세트 작업
샘플 데이터세트를 생성하는 간단한 방법은 SparkSession
인스턴스를 사용하는 것임.
spark.createDataset() 과 같은?
샘플 데이터 변환
데이터세트는 도메인별 객체의 강력하게 정형화된 컬렉션임을 기억해야 함. 이런 객체는 함수적 혹은 관계적인 연산을 사용해서 병렬적으로 변환 가능함.
spark에서 job 명령을 줘서 실행하는 것 같은?
예시로, map, reduce, filter, select, aggregate
함수가 있음.
고차함수의 예시로 람다, 클로저 또는 함수를 인수로 사용하고 결과를 반환할 수 있다.
고차함수 = 람다 혹은 일급객체 같은 방법 즉, 함수 자체가 인수로 사용할 수 있음. 클로저 : Java에서 사용하는 개념이다.
스칼라는 함수형 프로그래밍 언어이고, 람다, 함수형 인수, 클로저가 자바에도 추가되었다. 스파크에서 고차 함수를 몇 가지 사용해보고 앞에서 생성한 샘플 데이터와 함께 함수형 프로그래밍 구조를 사용해보자면 아래와 같다.
import org.apache.spark.sql.functions._
dsUsage
.filter(d => d.usage > 900)
.orderBy(desc("usage"))
.show(5, false)
다른 방법은 함수를 정의하고 해당 함수를 filter()
함수의 인수로 제공하는 것이다.
def filterWithUsage(u: Usage) = u.usage > 900
dsUsage.filter(filterWithUsage(_)).orderBy(desc("usage")).show(5)
첫 예시는 filter() 에 대한 인수로 람다식을 사용했고, 두 번째는 함수를 정의해서 사용했다.
두 경우 모두 filter()
함수는 분산된 데이터세트에서 Usage 객체의 각 행에 반복되어 람다 식을 적용하거나 함수를 실행하여 값이 참인 행에 대해 새로운 Usage 데이터세트를 반환하게 된다.
자바에서 필터링할 인수는 FilterFunction<T>
유형이다.(자세한건 공식 문서 참고)
이러한 방법은 어떤 컬럼에 대한 계산을 했지만, 계산값이 어떤 것과 연관되어 있는지 모르게됨.
연관을 알기 위해
- 추가 필드를 사용해서 스칼라 케이스 클래스 또는 자바빈 클래스인 UsageCost를 생성.
- 비용을 계산하는 함수를 정의하여 map 함수에 사용
map 함수를 사용하면 계산된 새로운 열, 비용을 가진 변환된 데이터 세트와 다른 모든 열을 가질 수 있음.
고차 함수 및 데이터세트 사용에 대해 몇 가지 주의해야 할 사항이 있다.
- 입력된 JVM 객체를 함수에 대한 인수로 사용하고 있다.
- 읽기 쉽게 만드는 도트 표기법을 사용하여 형식화된 JVM 객체 내의 개별 필드에 액세스한다.
- 일부 기능 및 람다 시그니처는 type-safe가 보장되어 컴파일 시점 오류 감지를 보장하고 스파크에게 어떤 데이터 유형, 수행할 작업을 지시할 수 있다.
- 코드는 람다 표현식의 자바 또는 시칼라 언어 기능을 사용하여 읽기 쉽고, 표현적이며, 간결하다.
- 스파크는 자바나 스칼라의 고차 함수 생성자가 없어도
map
이나filter
와 동등한 기능을 제공하므로 데이터세트 또는 데이터 프레임에서 함수형 프로그래밍을 사용할 필요가 없다. - 데이터세트의 경우 데이터 유형에 대한 JVM과 스파크 내부 이진 형식 간에 데이터를 효율적으로 변환하는 메커니즘인 인코더를 사용함.
데이터세트 및 데이터 프레임을 위한 메모리 관리
스파크는 인메모리 형식이다. 따라서, 메모리를 효율적으로 사용하는 것은 실행 속도에 큰 영향을 준다. 스파크 릴리스 역사를 보면 스파크의 메모리 사용은 엄청 진화했다고 함(관심있으면 찾아 보기 바람)
데이터 집합 인코더
인코더는 오프 힙 메모리의 데이터를 스파크 내부 텅스텐 포맷에서 JVM 자바 오브젝트로 변환함. 즉, 스파크 내부 형식에서 원시 데이터 유형을 포함한 JVM 객체로 데이터세트 객체를 직렬화하고 역직렬화함.
이런 특징이랑 스파크에서 인코더를 자동을 생성할 수 있는 내장 지원 기능이 있어서 스파크 인코더는 우수한 성능을 보인다고 한다. 참고
스파크 내부 형식과 자바 객체 형식 비교
자바 객체에는 헤더 정보, 해시 코드, 유니코드 정보 등 큰 오버헤드가 있다. 자바에서는 ‘abcd’와 같은 간단한 문자열도 4바이트가 아닌 48바이트를 사용한다.(참고로 한글 제외 문자 1개는 1바이트)
스파크는 데이터세트 또는 데이터 프레임을 위한 JVM 기반 객체를 생성하는 대신 오프 힙 자바 메모리를 할당해서 데이터를 레이아웃하고, 인코더를 사용해서 데이터를 메모리 내 표현에서 JVM 객체로 변환함.
데이터가 이렇게 인접한 방식으로 저장되고 포인터 산술과 오프셋을 통해 엑세스할 수 있을 때, 인코더는 데이터를 빠르게 직렬화하거나 역직렬화 할 수 있다.
이게 뭐가 중요하냐?
우선 직렬화 및 역직렬화에 대해서 알아보자면
데이터가 네트워크를 통해 자주 이동하며, 송신자에 의해 이진 표시 또는 형식으로 인코딩되고 수신자에 의해 이진 표시 또는 형식으로 인코딩(직렬화)되고 수신자에 의해 바이너리 형식에서 해당 데이터 형식 객체로 디코딩(역직렬화)되는 프로세스다(과정).
예를 들어서 그림 6-1에서 JVM 객체 MyClass를 스파크 클러스터의 노드 간에 공유해야 하는 경우, 송신자는 이를 바이트 배열로 직렬화하고 수신자는 이를 다시 MyClass 유형의 JVM 객체로 직렬화 한다.
JVM 자체에 자바 직렬화/역직렬화 기능이 내장되어 있지만, 5장에서 힙 메모리에서 JVM에 의해 생성된 자바 객체가 비대하기 때문에 비효율적
이러한 과정이 데이터세트 인코더에서는 다음과 같은 사항으로 개선됨
- 스파크의 내부 텅스텐 이진 형식(그림 6-1, 6-2)은 자바 힙 메모리에서 객체를 저장하며, 크기가 작아 공간을 적게 차지함.
- 인코더는 메모리 주소와 오프셋이 있는 간단한 포인터 계산을 사용하여 메모리를 가조질러 빠르게 직렬화할 수 있다(그림6-2).
- 수식 단부에서 인코더는 스파크의 내부 표현으로 이진 표현을 빠르게 역직렬화할 수 있기 때문에 JVM의 가비지 컬렉션의 일시 중지로 인한 방해가 없음
데이터세트 사용 비용
좀 이상하지만 좋은 것에는 부작용도 있음
데이터 프레임 VS 데이터세트에서는 데이터세트 사용의 몇 가지 이점을 간략히 설명했다.
다만 사용하는 것에 비용이 발생하는데 filter(), map() 같은 것을 사용할 때 스파크 내부 텅스텐에서 JVM객체로 역직렬화하는 비용이 발생함….
스파크 인코더 도입 이전에 사용된 다른 직렬화 방법들과 비교하면 이정도 비용은 감수할 수 있다는 분위기? 단, 처리할 데이터 세트가 많은 경우에는 조금 주의해야함.
비용 절감 전략
과도하게 직렬화, 역직렬화를 완화하기 위한 전략은 쿼리에서 DSL 표현을 사용하고 람다를 고차 함수에 대한 인수로 과도하게 사용해서 익명성을 높이는 작업을 피하는 것.
람다는 런타임까지 카탈리스크 옵티마이저에서 익명이고 명확하지 않기 때문에 이를 사용하면 사용자가 수행하는 작업을 효율적으로 식별할 수 없음
즉, 스파크에게 작업 명령을 하지 않음..
카탈리스크 옵티마이저 몰라? Saprk 3장 읽어보자고
그림으로 이해해보자
만약에 DSL만 사용하고 람다는 사용하지 않는 경우! 훨씬 더 효율적임. => 전체 합성 및 체인 쿼리에 대해 직렬/역직렬화가 필요하지 않기 때문임.
요약
- 자바 및 스칼라에서 데이터세트 관련 사용법
- 스파크가 통합된 고차원의 API의 일부인 데이터세트 구성을 수용하기 위해 메모리 관리 방법
- 데이터세트 사용과 관련된 일부 비용을 고려 및 비용을 줄이는 방법을 알아봄
- 인코더가 스파크 내부 텅스텐 이진 형식에서 JVM 객체로 직렬화하고 역직렬화하는 방법을 간단히 살펴봄