스파크 기본 설정을 조정하여 성능을 개선시킬 수 있을까?
작성자 : 박민서
스파크 애플리케이션의 최적화 및 튜닝
데이터세트를 사용하는 데에 대한 비용을 줄이는 데에 있어서 스파크를 어떻게 최적화하고 튜닝할 것인지에 대해 살펴보자!
효율적으로 스파크를 최적화 및 튜닝하기
아파치 스파크 설정 확인 및 세팅
스파크 설정을 확인하고 설정하는 세 가지 방법에 대하여 알아보자.
(1) 설정 파일들을 통한 방법
스파크가 설치된 $SPARK_HOME 디렉터리에는 여러 설정 파일들이 있다. (~.~~.template 형식의 파일들) 기본값을 변경한 뒤에 .template 부분을 지우고 새로 다시 저장하면 스파크의 설정을 변경할 수 있다.
(2) –conf 옵션을 쓰거나 스파크 애플리케이션 내에서 직접 설정하는 방법
// 명령 행에서 애플리케이션을 spark-summit으로 제출할 때 --conf 옵션 사용하여 직접 설정
spark-submit --conf spark.sql.shuffle.partitions=5 --conf "spark.executor.memory=2g" --class main.scala.chapter7.SparkConfig_7_1 jars/main- scala-chapter7_2.12-1.-.jar
// 스칼라 예제
import org.apache.spark.sql.SparkSession
def printConfigs(session: SparkSession) = {
// 설정값을 받아옴
val mconf = session.conf.getAll
// 설정값 출력
for (k <- mconf.keySet) println(s"${k} -> ${mconf(k)}\n")
}
def main(args: Array[String]) = {
// 세션 설정
val spark = SparkSession.builder
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memory", "2g")
.master("local[*]")
.appName("SparkConfig")
.getOrCreate()
printConfigs(spark)
spark.conf.set("spark.sql.shuffle.partitions",
spark.sparkContext.defaultParallelism)
println(" ****** Setting Shuffle Partitions to Default Parallelism")
printConfigs(spark)
}
(3) 스파크 셸에서 프로그래밍 인터페이스 사용
API는 스파크와 상호 작용을 하기 위한 가장 근본적인 방법으로, SparkSession 객체를 사용하면 대부분의 스파크 설정에 접근 가능하다.
현재의 설정값을 프로그래밍으로 변경하려면 우선 값이 변경 가능한지 확인해보아야 한다. **spark.conf.isModifiable(“<설정필드 이름="">")** 을 수정 가능 여부를 알수 있고, 수정 가능한 설정값은 API를 통하여 변경 가능하다.설정필드>
// 스칼라 예제
scala> spark.conf.get("spark.sql.shuffle.partitions") // 설정값 확인
scala> spark.conf.set("spark.sql.shuffle.partitions", 5) // 설정값 변경
# 파이썬 예제
>>> spark.conf.get("spark.sql.shuffle.partitions") # 설정값 확인
>>> spark.conf.set("spark.sql.shuffle.partitions", 5) # 설정값 변경
스파크 설정값을 결정하는 방법들 간 우선순위
(1) spark-default.conf 에 정의된 값이나 플래그
(2) spark-submit의 명령 행 설정
(3) 스파크 애플리케이션에서 SparkSession을 통해 설정된 값
대규모 워크로드를 위한 스파크 규모 확장
대규모 스파크 워크로드 시 자원 부족이나 점적 성능 저하에 의한 작업 실패를 피하기 위해 스파크 설정을 조정할 수 있다. 이 설정들은 스파크 드라이버, 이그제큐터, 이그제큐터에서 실행되는 셔플 서비스 이렇게 3가지의 스파크 컴포넌트에 영향을 미친다.
스파크 드라이버는 클러스터 매니저와 함께 클러스터에 이규제큐터들을 띄우고, 이규제큐터 위에서 돌아갈 스파크 태스크들을 스케줄링하는 역할을 수행한다.
정적/동적 자원 할당
spark-submit에 명령 행 인자를 사용하여 자원량에 제한을 둘 수 있다. 그렇게 제한을 두게 되면 시작할 때의 워크로드보다 더 방대한 작업으로 인해 드라이버에 나중에 태스크들이 기다리는 상황이 발생해도 스파크가 지정한 이상의 추가적인 자원들을 더 할당할 수 있다.
스파크의 동적 자원 할당 설정을 사용하면 요청에 맞춰서 컴퓨팅 자원을 할당하거나 줄이록 요청할 수 있게 된다. 동적 할당을 활성화하고 설정하기 위해서는 프로그래밍을 통하여 설정해야 한다.
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
spark.dynamicAllocation.enabled 의 default 값은 false 임에 유의해야 한다.
스파크 이규제큐터의 메모리와 셔플 서비스 설정
이규제큐터 메모리가 어떤 식으로 구성되고 스파크가 어떻게 사용되는지 알아두어야 메모리 부족 문제에 시달리지 않을 수 있다.
각 이그제큐터에서 사용 가능한 메모리의 양은 spark.executor.memory에 의해 제어된다. 아래의 그림에서 보듯 실행 메모리, 저장 메모리, 예비 메모리 의 세 부분으로 나뉘어지는데, OOM 에러를 예방하기 위한 목적인 예비 메모리에 300MB를 할당하고, 나머지의 60%를 실행 메모리, 40%를 저장 메모리에 할당한다. spark.executor.memory 의 비율을 원하는 수치로 수정 가능하다.
실행 메모리는 스파크의 셔플, 조인, 정렬, 집계 등에 사용되며 저장 메모리는 사용자 데이터 구조를 캐싱하고 데이터 프레임에서 온 파티션들을 저장하는 데에 주로 사용된다.
맵이나 셔플 작업이 이루어지는 동안 스파크는 로컬 디스크의 셔플 파일에 데이터를 쓰고 읽으므로 큰 I/O 작업이 발생한다. 그러나 기본 설정들은 대용량 스파크 작업에 최적화 되어있지 않으므로 병목 현상이 발생할 수 있다.
아래는 병목 현상 발생 위험성을 줄이기 위한 설정들이다.
스파크 병렬성 최대화
스파크의 유용성의 많은 부분들은 여러 태스크를 동시에 대규모로 실행시킬 수 있는 능력에 기인한다. 병렬성을 최대화하기 위하여 파티션이 어떤 역할을 하는지 무엇인지 알 필요가 있다.
데이터 관리 용어에서 파티션 이란 데이터를 관리 가능하고 쉽게 읽어들일 수 있도록 디스크에 연속된 위치에 조각이나 블록들의 모음으로 나누어 저장하는 방법 이다. 각 데이터 모음들은 병렬적/독립적으로 읽어서 처리가 가능하고, 하나의 프로세스 내에서 멀티 스레딩으로 처리 가능하다. 특히, 독립적으로 처리 가능한 특성은 대규모 병렬 데이터 처리를 가능하게 하는 중요한 포인트이다.
스파크는 태스크를 스케줄링을 하면서 파티션을 처리하게 될 것이다. 자원사용을 최적화하고 병렬성을 최대로 끌어올리려면 이그제큐터에 할당된 코어 개수만큼 파티션들이 최소한으로 할당되면 된다. 파티션이 가장 기본적인 병렬성의 한 단위라고 생각할 수도 있고 하나의 코어에서 돌아가는 하나의 스레드는 하나의 파티션을 처리할 수 있다.
파티션은 어떻게 만들어지는가
스파크의 태스크들은 데이터를 디스크에서 읽어 메모리로 올리면서 파티션 단위로 처리한다. 이 때 디스크의 데이터는 저장장치에 따라 조각이나 연속된 파일 블록으로 존재하는데 이런 블록들이 모여서 하나의 파티션을 구성하게 된다.
스파크에서 한 파티션의 크기 는 spark.sql.files.maxPartitionByte 에 따라 결정되며 기본 값은 128MB이다. 이 크기를 줄이게 되면 작은 파티션 파일이 많아지면서 디스크 I/O 양이 급증하는 ‘작은 파일 문제’ 가 발생한다. 그에 따라 성능 저하가 일어나며 분산 파일 시스템이 느려질 수 있다.
파티션은 데이터 프레임 API의 특정 함수들을 사용하면 만들어지기도 한다.
// 스칼라 예제
// 큰 파일을 읽어오면서 명시적으로 파티션을 만들도록 지시함
val ds = spark.read.textFile("../README.md").repartition(16)
// 큰 데이터프레임을 생성하면서 명시적으로 파티션을 만들도록 지시함
val numDF = spark.range(1000L * 1000 * 1000).repartition(16)
셔플 단계에서 만들어지는 셔플 파티션
셔플 파티션 개수는 spark.sql.shuffle.partitions에 50으로 지정되어 있다. 데이터 사이즈 크기에 따라 숫자를 조절하여 너무 작은 파티션이 이규제큐터에 할당되지 않도록 조정할 수 있다.
groupBy()나 join()같이 넓은 트랜스포메이션 작업 중에 생성되는 셔플 파티션은 네트워크와 디스크 I/O를 모두 사용하게 되는데, SSD 디스크를 장착해 두면 성능을 향상시킬 수 있다.
성능을 올리기 위해서는 자주 쓰는 데이터 프레임이나 테이블 데이터에 대해 캐싱/영속화를 고려해보아야 한다.
데이터 캐싱과 영속화
캐싱과 영속화의 차이는 무엇일까? 스파크에서는 두 단어가 서로 동의어라고 볼 수 있다. cache()와 persist()의 두 가지 API 호출이 이 기능들을 제공한다. 후자는 데이터가 저장되는 위치와 방식에 대해 좀 더 세밀한 설정이 가능하다.
DataFrame.cache()
cache()는 허용하는 메모리 수준만큼 이규제큐터들의 메모리에 읽은 파티션을 최대한 저장한다. 데이터 프레임은 그중 일부만 캐시되더라도 각각의 파티션들의 일부만 저장될 수는 없다. (4.5 파티션만 들어갈 수 있으면 4 파티션만 저장됨)
모든 파티션이 캐시된 것이 아니라면 데이터에 다시 접근을 시도할 때 캐시되지 않은 파티션은 재계산 되어야 하며 이는 스파크 잡을 느리게 만든다.
// 스칼라 예제
// 천만 개의 레코드를 가지는 데이터 프레임 생성
val df = spark.range(1*1000000).toDF("id").withColumn("square", $"id" * $"id")
df.cache() // 데이터 캐싱
df.count() // 캐시 수행 => 5.11 sec 소요
df.count() // 캐시 사용 => 0.44 sec 소요
처음 count()에서 실제로 캐싱을 수행하게 되고 두 번째 count() 함수 호출 시에는 캐시를 사용하게 되면서 속도가 굉장히 향상된다.
cache()나 persist()를 사용할 때 모든 레코드를 접근하는 액션(대표적으로 count())을 호출하기 전까지는 완전히 캐시되지 않는다는 점을 유의해야 한다.
DataFrame.persist()
persist(StroageLevel.LEVEL)의 함수 호출 방식은 직관적으로 StorageLevel을 통해 데이터가 어떤 방식으로 캐시될 것인지 제어할 수 있다.
동일한 예제를 수행하되 cache() 대신 persist() 함수를 사용해보자.
// 스칼라 예제
// 천만 개의 레코드를 가지는 데이터 프레임 생성
val df = spark.range(1*1000000).toDF("id").withColumn("square", $"id" * $"id")
df.persist(StorageLevel.DISK_ONLY) // 데이터를 직렬화하여 디스크에 저장
df.count() // 캐시 수행 => 2.08 sec 소요
df.count() // 캐시 사용 => 0.38 sec 소요
캐시된 데이터를 비우고 싶다면 DataFrame.unpersist()를 호출하면 된다.
캐시는 데이터 프레임에서 파생된 테이블이나 뷰도 캐시할 수 있다.
캐시나 영속화 사용 팁
캐시를 쓰는 경우 : 큰 데이터세트에 쿼리나 트랜스포메이션으로 반복적으로 접근 해야 하는 경우
-반복적인 머신러닝 학습을 위해 계속 접근해야 하는 데이터 프레임들
-ETL이나 데이터 파이프라인 구축 시 빈도 높은 트랜스포메이션 연산으로 자주 접근해야 하는 데이터 프레임들
아래와 같은 경우에는 캐시나 영속화를 사용하지 않는 것이 좋다.
-데이터 프레임이 메모리에 들어가기에 너무 큰 경우
-크기에 상관없이 자주 쓰지 않는 데이터 프레임에 대해 비용이 크지 않는 트랜스포메이션을 수행하는 경우
다음으로는 비용을 많이 소모하는 데이터의 이동 작업이나 클러스터에 연산과 네트워크 자원을 요구하는 작업인 스파크 조인에 대해 알아보자.
스파크 조인의 종류
조인 연산이란 빅데이터 분석에서 일반적인 트랜스포메이션 연산 형태로, 두 종류의 데이터세트를 공통적으로 일치하는 키를 기준으로 병합하는 연산이다. 조인 연산들은 스파크 이그제큐터들 사이에 방대한 데이터 이동을 일으킨다.
스파크의 조인 전략 중 브로드캐스트 해시 조인 Broadcast Hash Join, BHJ 과 셔플 소트 머지 조인 Shuffle Sort Merge Join, SMJ 에 대해 알아보자.
브로드캐스트 해시 조인
맵사이드 조인 map-side-only join 이라고도 한다. 데이터 이동이 거의 필요 없도록 한쪽은 작고 다른 쪽은 큰 두 종류의 데이터를 사용하여 특정 조건이나 칼럼 기준으로 조인한다. 이 조인 전략은 큰 데이터 교환 발생을 방지 하는 것을 목적으로 한다.
기본적으로 작은 쪽의 데이터가 10MB 이하일 때 브로드캐스트 조인을 사용한다. 두 데이터 프레임에 공통적인 키들이 존재 하고 한쪽이 가지고 있는 정보가 적은데 양쪽의 뷰를 병합 하는 경우에 BHJ를 사용한다.
// 스칼라 예제
import org.apache.spark.sql.functions.broadcast
val joinedDF = playersDF.join(broadcast(clubsDF), "key1 == key2")
BHJ는 어떤 셔플도 일어나지 않기 때문에 스파크가 제공하는 가장 쉽고 빠른 조인 형태라고 할 수 있으며, 브로드캐스팅 이후에는 로컬에서 각 이그제큐터에 필요한 모든 데이터에 접근 가능해진다.
어떤 조인이 사용되는지 물리적 계획을 확인하고 싶다면 아래의 코드 한 줄을 실행하면 된다.
joinedDF.explain(mode)
인자에 입력 가능한 모드로는 ‘simple’, ‘extended’, ‘codegen’, ‘cost’, ‘formatted’가 있다.
브로드캐스트 해시 조인을 사용하는 경우
-양쪽 데이터세트의 각 키가 스파크에서 동일한 파티션 안에 해시되는 경우
-한 데이터가 다른 쪽보다 많이 작은 경우
-정렬되지 않은 키들 기준으로 두 데이터를 결합하면서 동등 조인equi-join을 수행할 때
-더 작은 쪽의 데이터가 모든 스파크 이그제큐터에 브로드캐스트될 때 발생하는 과도한 네트워크 대역폭이나 OOM 오류에 대해 걱정할 필요가 없는 경우
셔플 소트 머지 조인
해시 가능한 공통 키 를 가지면서 공통 파티션에 존재하는 두 가지의 데이터세트 를 합칠 때 사용하면 좋다.
소트 머지라는 이름에서 알 수 있듯이 이 조인 방식은 정렬과 병합의 두 단계가 존재한다. 조인 키를 기준으로 정렬하고 각 데이터세트에서 키 순서대로 데이터를 순회하며 키가 일치하는 로우끼리 병합한다.
소트 머지 조인은 spark.sql.join.preferSortMergeJoin 설정에 의해 활성화 가능하다. 아래의 그림들을 보면 마지막 스테이지에서 Exchange와 Sort 작업이 결과 병합 직전에 일어난다는 것을 알 수 있다. 각 이그제큐터에서 이루어지는 맵 연산 결과물의 셔플인 Exchange 연산은 비용이 많이 드는 작업이며 이그제큐터 간에 네트워크 상으로 파티션 셔플이 요구된다.
셔플 소트 머지 조인 최적화
동등 조건 조인을 빈번하게 수행하고 싶다면 공통의 정렬된 키나 칼럼을 위한 파티션된 버킷을 만들어서 Exchange 단계를 없앨 수 있다.
이러한 방식으로 사전 정렬 및 데이터 재구성을 시도하면 Exchange를 생략하고 WholeStageCodegen으로 넘어가므로 성능이 향상된다.
예를 들어 두 공통 키 uid == users_id 를 조건으로 usersDF와 ordersDF라는 데이터 프레임을 조인해야하는 상황이라고 해보자.
// 스칼라 예제
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
// 파케이 포맷으로 버케팅해 스파크 관리 테이블로 저장한다
usersDF.orderBy(asc("uid"))
.write.format("parquet")
.bucketBy(8, "uid")
.mode(SaveMode.OverWrite)
.saveAsTable("UsersTbl")
ordersDF.orderBy(asc("users_id"))
.write.format("parquet")
.bucketBy(8, "users_id")
.mode(SaveMode.OverWrite)
.saveAsTable("OrdersTbl")
// 테이블 캐싱
spark.sql("CACHE TABLE UsersTbl")
spark.sql("CACHE TABLE OrdersTbl")
// 다시 읽어들임
val usersBucketDF = spark.table("UsersTbl")
val ordersBucketDF = spark.table("OrdersTbl")
// 조인하고 결과를 보여준다
val joinUsersOrdersBucketDF = orderBucketDF
.join(usersBucketDF, $"users_id" === $"uid")
테이블을 정렬된 상태로 저장했기 때문에 SortMergeJoin 동안 정렬할 필요가 없으므로 Exchange를 건너뛰고 바로 WholeStageCodegen으로 진행 가능하다.
버케팅 이전의 과정과 비교해보면 Exchange가 수행되지 않음을 볼 수 있다.
셔플 소트 머지 조인을 사용하는 경우
-두 큰 데이터세트의 각 키가 정렬 및 해시되어 스파크에서 동일 파티션에 위치할 수 있을 때
-동일 조건 조인만을 수행하여 정렬된 동일 키 기반으로 두 데티터세트를 조합하기 원할 때
-네트워크 간에 규모가 큰 셔플을 일으키는 Exchange와 Sort 연산을 피하고 싶은 경우
스파크 UI
스파크 UI를 통하여 메모리 사용량, 잡, 스테이지, 태스크, 이벤트 타임라인, 로그 등에 대해 알 수 있으며, spark-submit은 스파크 UI를 띄운다. 로컬호스트나 스파크 드라이버를 통해 기본 포트 4040으로 연결 가능하다.
스파크 UI 탭은 아래와 같이 6개의 탭으로 있다.
Jobs와 Stage 탭
이 탭에서는 각 탭을 통해 개별 태스크의 디테일까지 살펴볼 수 있다. 각자의 완료 상태와 I/O 관련 수치, 메모리 소비량, 실행 시간 등을 살펴볼 수 있다.
아래는 Jobs 탭의 확장된 이벤트 타임라인으로, 어느 시점에 이그제큐터가 클러스터에 추가되고 삭제되는지 확인할 수 있다. Duration(소요시간) 칼럼은 딜레이를 일으키는 태스크를 찾아내는 단서가 될 수 있다.
아래는 Stages 탭으로 모든 잡에 대한 모든 스테이지의 현재 상태에 대한 요약을 제공한다. 최대 태스크 수행 시간이 평균보다 너무 높은 경우 파티션들끼리 균등하지 못한 데이터 분포에 의해 데이터 불균형이 일어난 것일 수 있다.
Executors 탭
Executors 탭은 애플리케이션에서 생성된 이그제큐터들에 의한 정보를 제공한다. 아래는 Executors 탭이다. 데이터 프레임이나 관리 테이블에서 cache()나 persist()를 썻을 때 사용량을 확인하는 데에 도움이 된다.
Storage 탭
아래는 Storage 탭으로, cache()나 persist()의 결과로 애플리케이션에서 캐시된 데이터 프레임이나 테이블의 정보를 제공한다.
‘In-memory table ‘테이블명’’ 링크를 클릭하면 메모리와 디스크 상에서 한의 이그제큐터와 파티션들이 어떻게 캐시되어 있는지 보여준다.
SQL 탭
스파크 SQL 쿼리가 언제 어떤 잡에 의해 실행되었고 얼마나 걸렸는지 등을 알 수 있는 탭이다.
쿼리의 ‘Desciption’을 클릭하면 아래 그림과 같이 물리적 오프레이터들과 상세 실행 계획을 보여준다.
Environment 탭
어떤 환경 변수가 지정되어 있고 어떤 jar 파일들이 포함되어 있으며 어떤 스파크 특성이나 시스템 특성이 지정되어 있는지 어떤 런타임 환경이 사용되고 있는지 등의 정보를 알 수 있는 탭이다.
끝
스파크 튜닝을 통하여 대규모 워크로드를 개선시키고, 병렬성을 향상시키고, 이그제큐터들의 메모리 부족 현상을 최소화 할 수 있었다.