정형화된 구조를 쓰게 된 이유와 정형화 API
작성자 : 박민서
아파치 스파크의 정형화 API
- 아파치 스파크의 정형화 API
- 스파크: RDD의 아래에는 무엇이 있는가
- 스파크의 구조 확립
- 데이터 프레임 API
- 데이터세트 API
- 데이터 프레임 vs 데이터세트
- 스파크 SQL과 하부의 엔진
아파치 스파크에 정형화된 구조를 추가하게 된 이유와, 정형화 API에 대해 살펴볼 것이다.
스파크: RDD의 아래에는 무엇이 있는가
RDD의 핵심 특성인 의존성, 파티션, 연산 함수는 무엇이지 하나하나 살펴보자.
의존성 (dependency) 이란 어떤 입력을 필요로 하고, 현재의 RDD가 어떻게 만들어지는지 스파크에게 알려주는 것이다.
결과를 새로 만들어야 하는 경우에 스파크는 의존성 정보를 참고하여 재연산 후 RDD를 만들 수 있고, 이러한 특성이 RDD에 유연성을 부여한다.
파티션 (partition) 는 지역성 정보를 포함하고, 스파크에게 작업을 나눠서 이그제큐터들에 분산하여 파티션별 병렬 연산이 가능한 능력을 부여한다. 스파크는 지역성 정보를 이용해서 각 이그제큐터가 가까이 있는 데이터를 처리할 수 있도록 작업을 부여한다.
RDD의 연산 함수 는 RDD에 저장되는 데이터를 Iterator[T] 형태로 만들어주는 역할을 한다.
근데 스파크에서는 람다 표현식으로만 보이기 때문에, 사용자가 연산 함수 안에서 뭘 하는지를 알 수 없었다. 뿐만 아니라 Iterator[T] 데이터 타입이 스파크에서는 단지 파이썬 기본 객체로만 인식이 가능했다.
연산 함수 내에서 무엇을 하는지 알 수 없었기 때문에 최적화 할 수 있는 방법이 없었고, 타입에 대한 정보도 없었기 때문에 스파크가 연산을 효과적으로 하기 위해 연산 순서를 변경하는 것이 어려웠다.
스파크의 구조 확립
스파크 2에 접어들면서 구조가 확립되었다.
데이터 분석을 통해 찾은 일상적인 패턴들을 이용하여 연산을 표현하였고, 이 패턴들은 필터링, 선택, 집합연산, 집계, 평균, 그룹화 같은 고수준 연산으로 표현되었다.
DSL에서 일반적인 연산 집합을 사용하면서 API 사용이 가능해지고 이러한 연산자들은 스파크에게 데이터로 무슨 작업을 하고 싶은지, 결과로 무엇을 원하는지 알려줄 수 있게 되었으며 덕분에 더 효율적인 계획을 짤 수 있게 되었다.
SQL의 테이블이나 스프레드시트처럼 정형화 데이터 타입을 사용하여 표 형태로 구성 할 수 있게 되었다.
핵심적인 장점과 이득
구조를 갖추면 표현성, 단순성, 구성 용이성, 통일성의 측면에서 많은 이득을 얻을 수 있다.
표현성과 구성 용이성을 먼저 살펴보자.
각 이름별로 모든 나이들을 그룹화하고, 나이의 평균을 구하는 예제를 통하여 비교해보자.
# 파이썬 예제 - RDD
# (name, age) 형태의 튜플로 된 RDD를 생성한다
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),
("TD", 35),("Brooke", 25)])
# 집계와 평균을 위한 람다 표현식과 함께 map과 reduceByKey 트랜스포매이션을 사용
agesRDD = (dataRDD
.map(lambda x: (x[0], (x[1], 1))))
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
.map(lambda x: (x[0], x[1][0]/x[1][1]))
위처럼 스파크에게 어떻게 키를 집계하고 평균 계산을 하는지 람다함수로 알려주는 저수준의 RDD API를 사용하는 경우, 코드가 복잡하고 코드를 한 눈에 이해하기가 어렵다는 단점이 있다.
# 파이썬 예제 - DSL 연산자와 API
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
# SparkSession으로부터 데이터 프레임을 만든다
spark = (SparkSession
.builder
.appName("AuthorsAges")
.getOrCreate())
# 데이터 프레임 생성
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])
# 동일한 이름으로 그룹화하여 나이별로 계산해 평균을 구한다.
avg_df = data_df.groupBy("name").agg(avg("age"))
# 최종 실행 결과를 보여준다
avg_df.show()
반면에, 스파크에게 무엇을 할 지를 알려주는 고수준 DSL 연산자들과 데이터 프레임 API 를 쓴다면, RDD에 비하여 훨씬 더 표현력이 높으며, 이전 버전에 비하여코드도 간단하다.
스파크의 상위 수준 API는 컴포넌트들과 언어를 통틀어 일관성을 가지는 편이다. (같은 일을 하면서 형태도 비슷)
데이터 프레임 API
구조, 포맷 등 판다스 데이터 프레임에 영향을 받은 스파크 데이터 프레임은 이름 있는 칼럼과 스키마를 가진 분산 인메모리 테이블처럼 동작한다.
(각 컬럼은 정수 integer, 문자열 string, 배열 array, 맵 map, 실수 real, 날짜 date, 타임스탬프 timestamp 등의 데이터타입을 가질 수 있다)
데이터 프레임은 불변성을 지니며 스파크는 그에 대한 모든 변경 내역을 관리한다. 또한 데이터 프레임에서 이름이 붙은 칼럼과 연관 데이터 타입은 스키마에 선언할 수 있다.
스파크의 기본 데이터 타입
먼저, 스파크에서 지원하는 기본적인 스칼라 데이터 타입을 살펴보자.
다음은 스파크에서 지원하는 파이썬 기본 데이터 타입이다.
스파크의 정형화 데이터 타입
스파크의 정형화 데이터 타입에는 맵 map, 배열 array, 구조체 struct, 날짜 date, 타임스탬프 *timestamp, 필드 field 등이 있다.
먼저, 스파크에서 지원하는 스칼라 정형화 데이터 타입을 살펴보자.
다음은 스파크에서 지원하는 파이썬 정형화 데이터 타입이다.
스키마와 데이터 프레임 만들기
스키마 schema : 데이터 프레임을 위해 칼럼 이름과 연관된 데이터 타입을 정의한 것으로, 외부 데이터 소스에서 구조화된 데이터를 읽어 들일 때 쓰인다.
미리 스키마를 정의하면 무슨 장점이 있을까?
첫째, 스파크가 데이터 타입을 추측해야 하는 책임을 덜어 준다.
둘째, 스파크가 스키마를 확정하기 위해 파일의 많은 부분을 읽어들이려고 별도의 잡을 만드는 것을 방지하여 비용과 시간을 아낄 수 있다.
셋째, 데이터가 스키마와 맞지 않는 경우에 조기에 문제를 발견할 수 있다.
스키마를 정의하는 방법에 대해 알아보자.
스키마를 정의할 때는 프로그래밍 스타일로 정의할 수도 있고, DDL data definition language 를 사용하여 정의할 수도 있다.
프로그래밍 스타일로 정의하기 위해서는 스파크 데이터 프레임 API 를 사용하면 된다.
// 스칼라 예제
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("author", StringType, false),
StructField("title", StringType, false),
STructField("pages", IntegerType, false)))
# 파이썬 예제
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False)])
DDL 을 사용하면 더욱 간단하다
// 스칼라 예제
val schema = "author STRING, title STRING, pages INT"
schema = "author STRING, title STRING, pages INT"
스키마를 코드의 다른 부분에서 사용하고 싶을 떄는 데이터 프레임명.schema 를 호출하면 스키마 정의를 리턴해준다.
칼럼과 표현식
데이터 프레임에서 이름이 정해진 칼럼들은 특정한 타입의 필드를 나타내는 개념이라고 생각하면 된다.
사용자는 이름으로 칼럼을 나열할 수도 있고, 표현식을 통하여 각 값들에 연산을 수행할 수도 있다. (표현식을 사용할 때는 expr()을 사용하며, 스파크가 결과를 계산하여 표현식으로 해석할 수 있는 인자를 받는다.)
칼럼은 공개 *public 메소드를 가진 객체, 칼럼 타입으로 표현된다.
특정 칼럼으로 데이터 프레임을 정렬하는 두 가지 방법을 알아보자.
// 명시적으로 col("칼럼명") 이라는 함수를 사용하여 Column 객체를 리턴함
데이터 프레임명.sort(col("칼럼명").desc)
// 칼럼 이름 앞에 $를 붙여서 따옴표 안에 적힌 이름의 칼럼을 Column 타입으로 변환해주는 스파크의 함수를 사용함
데이터 프레임명.sort($"컬럼명".desc)
데이터 프레임의 Column 객체는 단독으로 존재할 수 없다. 각 칼럼은 로우의 일부분이며 모든 로우가 합쳐져서 하나의 데이터 프레임을 구성한다.
로우
하나 이상의 칼럼을 갖고 있는 로우 row 객체는 스파크의 객체이고 순서가 있는 필드 객체 이므로 각 필드를 0부터 시작하는 인덱스로 접근 한다.
// 스칼라 예제
import org.apache.spark.sql.Row
// Row 객체 생성
val blogRow = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
Array("twitter", "LinkedIn"))
// 인덱스로 개별 아이템에 접근해보기
blogRow(1)
res62: Any = Reynold
# 파이썬 예제
from pyspark.sql import Row
blog_row = Row(6,"Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015",
["twitter", "LinkedIn"])
# 인덱스로 개별 아이템에 접근해보기
blog_row[1]
'Reynold'
Row 객체들은 빠른 탐색을 위하여 데이터 프레임으로 만들어서 사용하기도 한다.
// 스칼라
val rows = Seq(("Matei Zaharia", "CA"), ("Reynold Xin", "CA"))
val authorsDF = rows.toDF("Author", "State")
# 파이썬
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Author", "State"])
authors_df
구조화된 데이터를 갖고 있는 데이터 소스에서 데이터 프레임으로 로드 하기 위해 DataFrameReader 라는 이름의 인터페이스를 제공한다. 이는 JSON, CSV, 파케이 Parquet, 텍스트, 에이브로, ORC 같은 다양한 포맷의 데이터 소스에서 데이터 프레임으로 만들 수 있게 해준다.
# 파이썬에서 스키마 정의하기 <- 위에서 설명한 방법으로 정의
t_schema = ...
# DataFrameReader 인터페이스로 CSV 파일 읽기
t_file = "/.../../file1.csv" # 파일 경로
file_df = spark.read.csv(t_file, header=True, schema=t_schema)
// 스칼라에서 스키마 정의하기 <- 위에서 설명한 방법으로 정의
val s_schema = ...
// CSV DataFrameReader로 파일을 읽는다
val s_file = "/.../../file1.csv" // 파일 경로
val sDF = spark.read.schema(s_schema)
.option("header", "true")
.csv(s_file)
spark.read.csv() 함수는 CSV 파일을 읽어서 row 객체와 스키마에 맞는 타입의 이름 있는 칼럼들로 이루어진 데이터 프레임을 리턴한다.
데이터 프레임을 외부 데이터 소스에 원하는 포맷으로 내보내기 위해서는 DataFrameWriter 을 사용하면 된다.
데이터 프레임을 파케이 파일으로 저장해보자.
// 스칼라에서 파케이로 저장
val parquetPath = ...
sDF.write.format("parquet").save(parquetPath)
# 파이썬에서 파케이로 저장
parquet_path = ...
file_df.write.format("parquet").save(parquet_path)
테이블로 저장하려면 마지막 부분만 수정해주면 된다.
// 스칼라에서 파케이로 저장
val parquetTable = ... // 테이블 이름
sDF.write.format("parquet").saveAsTable(parquetTable)
# 파이썬에서 파케이로 저장
parquet_table = ... # 테이블 이름
file_df.write.format("parquet").saveAsTable(parquet_table)
프로젝션과 필터
프로젝션 projection 은 필터를 이용하여 특정 관계 상태와 매치되는 행들만 리턴해주는 방법이다.
프로젝션은 select() 메서드로 수행하고, 필터는 filter() 나 where() 메서드로 표현된다.
// 스칼라 예제
var fewsDF = sDF
.select("컬럼명1", "컬럼명2", ...)
.where(조건)
fewsDF.show(5)
# 파이썬 예제
few_file_df = (file_df
.select("컬럼명1", "컬럼명2", ...)
.where(조건))
few_file_df.show(5)
null이 아닌 데이터를 알고 싶을 때는
// 스칼라 예제
sDF
.select("컬럼명1")
.where($"컬럼명1".isNotNull())
.distinct()
.show(10, false)
# 파이썬 예제
(file_df
.select("컬럼명1")
.where(col("컬럼명1").isNotNull())
.distinct()
.show(10, False))
칼럼의 이름 변경
칼럼의 이름을 변경하는 방법은 크게 두 가지가 있다.
StructField를 사용하여 스키마 내에서 원하는 칼럼 이름들을 지정하는 방법. (데이터 소스의 칼럼 이름을 무시하고 원하는 이름으로 읽어오는 경우임)
다른 방법으로는 withColumnRenamed() 함수를 사용하여 원하는 이름으로 변경하는 방법이 있다.
데이터 프레임명.withColumnRenamed("원래 컬럼명", "이걸로 변경할래요")
# 컬럼명이 "원래 컬럼명" -> "이걸로 변경할래요" 로 바뀜
근데 사실 withColumnRenamed() 함수도 원래 데이터프레임은 냅두고 칼럼 이름이 변경된 새로운 데이터 프레임을 하나 만드는 것이다.
데이터 프레임 내 컬럼의 데이터가 시간이나 날짜를 뜻하는 문자열일 경우에는 to_timestamp()나 to_date() 같은 to/from - date/timestamp 이름의 함수들을 사용하여 변환 가능하다.
집계 연산
groupBy(), orderBy(), count()는 칼럼 이름으로 집계해서 각각의 개수를 세어준다.
통계 함수
데이터 프레임 API는 min(), max(), sum(), avg() 등의 통계함수들을 지원한다.
데이터세트 API
데이터 세트는 정적 타입 typed API 와 동적 타입 untyped API 의 두 특성을 모두 가진다.
스칼라의 데이터 프레임은 Dataset[Row]의 다른 이름이라고 생각할 수 있으며, Row는 서로 다른 타입의 값을 저장할 수 있는 포괄적 JVM 객체라고 할 수 있다.
데이터세트는 스칼라에서 엄격하게 타입이 정해진 JVM 객체의 집합이며 자바의 클래스라고 볼 수 있다.
Row는 스파크의 포괄적 객체 타입이며 (배열처럼) 인덱스를 사용하여 접근할 수 있으며 다양한 타입의 값들을 가져올 수 있다. 스파크는 스파크에서 지원하는 타입들로 바꾸어 쓸 수 있도록 변환하여 Row 객체를 반환해준다.
Row 객체에 공개되어있는 게터 getter 류의 함수들에 의해 인덱스를 사용하여 개별 필드에 접근할 수 있다.
데이터 세트의 각 아이템들은 곧 하나의 JVM 객체가 되어 쓸 수 있다.
데이터세트 생성
데이터세트를 만들 때에도 데이터 타입들을 모두 알고 있어야 한다.
데이터세트를 만들 때 결과 데이터세트가 쓸 스키마를 지정하는 가장 쉬운 방법은 스칼라의 케이스 클래스 case class 를 사용하는 것이다. 자바라면 자바빈 JavaBean 클래스를 사용할 수 있다.
스칼라: 케이스 클래스
자신만의 특화된 객체를 데이터세트로 초기화해서 만들고 싶으면 스칼라에서 케이스 클래스 를 정의하여 만들 수 있다.
예제로 IoT 디바이스에서 읽어들인 JSON 파일을 살펴보자.
{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude": 53.080000, "longitude": 18.620000, ... }
각 JSON 엔트리를 특화 객체인 DeviceIoTData로 만들기 위해 스칼라 케이스 클래스를 정의할 수 있다.
case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String, cn: String, device_id: Long, device_name: String ... )
케이스 클래스를 정의한 이후에는 파일을 읽어와서 Dataset[Row]를 Dataset[DeviceIoTData]로 변경할 수 있다.
// 스칼라 예제
val ds = spark.read
.json("JSON파일경로")
.as[DeviceIoTData]
ds: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level ...]
데이터세트 API도 데이터 프레임 API처럼 다양한 함수를 사용할 수 있다.
filter() 함수
val filterTempDS = ds.filter(d => d.temp > 30 && d.huminity > 70)
filterTempDS: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level...]
filter(func: (T) > Boolean): Dataset[T], 인자로 람다 함수 func: (T) > Boolean을 받는다.
람다 함수의 인자는 DeviceIoTData의 JVM 객체이기 때문에, 점(.) 기호를 사용하여 개별 데이터 필드에 접근이 가능하다.
데이터 프레임 API과 유사하게 filter(), map(), groupBy(), select(), take() 등의 작업들을 수행할 수 있다.
데이터세트는 함수들의 형태나 컴파일 타임 안전성을 보장한다는 점은 RDD와 유사하지만, RDD보다 훨씬 읽기 쉽고 객체 지향 프로그래밍 인터페이스를 가지고 있다.
데이터 프레임 vs 데이터세트
데이터 프레임을 사용하면 좋은 경우
- SQL과 유사한 질의를 쓰는 관계형 변환을 필요로 하는 작업을 수행하는 경우
- 일원화, 코드 최적화, 스파크 컴포넌트들 사이에서의 API 단순화 등을 원하는 경우
- R을 사용하는 경우
- 공간/속도의 효율성을 원하는 경우
- 파이썬을 사용하는 경우 (제어권을 좀 더 갖고 싶다면 RDD로 변경하여 사용)
데이터세트를 사용하면 좋은 경우
- 컴파일 타임에 엄격한 타입 체크를 원하며, 특정한 Dataset[T]를 위하여 여러 개의 클래스 케이스를 만드는 데에 부담이 없는 경우
- 인코더 Encoder 를 써서 프로젝트 텅스텐의 직렬화 능력을 통한 이득을 보고 싶은 경우
데이터 프레임이나 데이터세트를 사용하면 좋은 경우
- 스파크에게 무엇을 해야하는지 를 알려주고 싶은 경우
- 풍부한 표현과 높은 수준의 추상화 및 DSL 연산을 사용하기를 원하는 경우
- 높은 수준의 표현력, 필터, 맵, 집계, 평균과 합계 계산, SQL 질의, 칼럼 지향 접근, 반정형화된 데이터에 대한 관계형 연산 등이 필요한 경우
실행 시에 발생하는 에러를 찾기보다는 컴파일 시에 발생하는 에러 를 찾고 싶다면 아래 그림을 보고 적절한 API를 선택
그럼 RDD는 언제 사용해
스파크가 어떻게 질의를 수행할지 정확하게 지정해주고 싶은 경우에 RDD를 사용하면 된다.
데이터세트나 데이터 프레임에서 RDD로 변환하기 위해서는 API 함수인 df.rdd를 호출하면 된다.
(그러나 당연히 변환하는데에는 비용이 상당히 발생한다 ㅎㅎ 꼭 필요한게 아니라면 지양해야함)
스파크 SQL과 하부의 엔진
효과적인 질의를 구축하고 간단한 코드를 생성하는 과정이 스파크 SQL 엔진이 하는 일이라 할 수 있다.
그 외에도 SQL 엔진이 수행하는 일들은 무엇이 있는지 살펴보자.
- 스파크 컴포넌트들을 통합 하고 데이터 프레임 / 데이터 세트가 스칼라가 지원하는 언어들을 사용하여 정형화 데이터 관련 작업을 단순화시킬 수 있도록 추상화
- 아파치 하이브 메타스토어와 테이블에 접근
- 정형화된 파일 포맷(JSON, CSV, 텍스트, 파케이, 에이브로, ORC 등)에서 스키마와 정형화 데이터를 읽고 쓰며, 데이터를 임시 테이블로 변환
- 빠른 데이터 탐색이 가능하도록 대화형 스파크 SQL 셸을 제공
- 표준 데이터베이스 JCBC/ODBC 커넥터를 통해 외부의 도구들과 연결할 수 있는 중간 역할을 함
- 최적화된 질의 계획과 JVM을 위한 최적화된 코드를 생성
위는 스파크 SQL이 상호 작용하는 컴포넌트들이다.
스파크 SQL 엔진의 핵심에는 상위 수준의 데이터 프레임과 데이터세트 API 및 SQL 쿼리 등을 지원하는 카탈리스트 옵티마이저와 텅스텐 프로젝트가 있다.
카탈리스트 옵티마이저
카탈리스트 옵티마이저는 연산 쿼리를 받아 실행 계획으로 변환하는 역할을 한다.
사용하는 언어에 상관없이 사용자가 실행한 작업은 유사한 실행 계획과 실행을 위한 바이트 코드를 생성하게 된다.
코드가 거치게 되는 다른 스테이지를 보기 위해서는 데이터 프레임에서 아래 코드를 실행시키면 된다.
# 파이썬
데이터 프레임명.explain(True)
// 스칼라
데이터 프레임명.queryExecution.logical // 방법1
데이터 프레임명.queryExecution.optimizedPlan // 방법2
초반의 분석 단계를 지나면 질의 계획은 아래 그림처럼 변환 및 최적화된다.
쿼리 최적화 과정을 한 단계씩 살펴보자.
1단계: 분석
SQL이나 데이터 프레임 쿼리를 위한 추상 문법 트리 abstract syntax tree, AST 를 생성한다. 초기 단계에서는 어떤 칼럼이나 스파크 SQL의 프로그래밍 인터페이스인 Catalog 객체로 접근하여 가져올 수 있다.
2단계: 논리적 최적화
내부적으로 두 가지 단계로 이루어지며 표준적 규칙을 기반으로 하는 최적화 접근 방식을 적용하며 카탈리스트 옵티마이저가 계획들을 수립한다. 이후, 비용 기반 옵티마이저 cost based optimizer, CBO 를 사용하여 각 계획의 비용을 책정한다. 이런 계획들은 연산 트리들로 배열되고, 논리 계획은 물리 계획 수립의 입력 데이터가 된다.
3단계: 물리 계획 수립
논리 계획을 바탕으로 대응되는 물리적 연산자를 사용해 최적화된 물리 계획을 생성한다.
4단계: 코드 생성
각 머신에서 실행할 효율적인 자바 바이트 코드를 생성하는 것을 포함한다. 포괄 whole-stage 코드 생성을 가능하게 하는 프로젝트 텅스턴이 이 단계에서 작업을 수행하게 된다.
(포괄 코드 생성: 물리적 쿼리 최적화 단계 로 전체 쿼리를 하나의 함수로 합치면서 가상 함수 호출이나 중간 데이터를 위한 CPU 레지스터 사용을 없앰)
3장의 내용을 요약하자면, 상위 수준의 데이터 프레임 및 데이터세트 API가 하위 수준의 RDD API에 비해 훨씬 표현력이 높고 직관적이며 스파크 지원 언어 중 어떤 것을 사용하든 스파크 쿼리는 동일한 최적화 과정을 거친다고 할 수 있다.