Spark 뭔지 알고 써야할 것 아냐?
작성자 : 박정현
아파치 스파크 소개 : 통합 분석 엔진
아파치 스파크의 시작과 그 아래 깔린 철학을 살펴본다. 또한 스파크 프로젝트의 핵심 컴포넌트와 분산 아키텍처도 살펴본다.
스파크의 시작
아파치 스파크의 발전 과정을 살펴본다. 아파치 스파크가 어떻게 생겨났고, 어떤 것으로 부터 영감을 받았는지, 그리고 어떻게 커뮤니티에서 실질적인 업계 표준인 빅데이터 통합 처리 엔진으로 채택되었는지 그 과정을 살펴볼 것이다.
빅데이터와 구글에서의 분산 컴퓨팅
기존 RDBMS(관계형 데이터 베이스)로는 구글이 색인을 만들고 검색할 규모의 인터넷 문서를 다루는 것은 불가능했다.
-> 새로운 접근 방식이 필요해서 여러가지 프로그램을 만들었음
클러스터의 워커 노드들은 중간 연산을 통해 집계하고, 결과를 합쳐 리듀스 함수에서 최종 결과를 생산 후 이를 접근 가능한 분산 저장소에 기록함.
이러한 방식은 네트워크 트래픽을 크게 감소시키지만 로컬 디스크에 대한 IO를 극대화.
HDFS(하둡 파일시스템)에서 돌아가는 맵리듀스 프레임워크에는 몇 가지 단점이 존재
- 번거로운 운영 복잡도로 인해 관리의 어려움
- 일반적인 배치 처리를 위한 맵리듀스 API는 장황하고 많은 양의 코드를 필요로 했음. 또한 장애 대응은 불안정했다.
- 방대한 배치 데이터 작업을 수행하면서 많은 MR(MapReduce) 태스크가 필요해지면 각 태스크는 이후의 단계들을 위해 중간 과정의 데이터를 로컬 디스크에 써야 했음
- 때문에 디스크 IO가 극대화되었고 결과적으로 많은 양의 MR을 처리하는 데 오랜 시간이 걸림.
- 하둡 MR은 일반적인 배치 처리를 위한 대규모 작업에는 적당하지만 머신러닝, 스트리밍, 상호 반응하는 SQL 계통의 질의 등 다른 워크로드와 연계해 쓰기엔 한계가 있다.
결국 이러한 단점을 극복하고 간단하고 빠르게 사용하고자 탄생한 것이 아파치 스파크다
아파치 스파크 핵심 특성
-
속도
스파크는 다양한 방법으로 속도에 대한 목표를 추구함. 우선 최근 CPU, 메모리의 성능이 향상했고, 스파크의 내부 구현은 이로부터 많은 이득을 얻음. 스파크는 질의 연산을 방향성 비순환 그래프(DAG)로 구성함. DAG의 스케줄러와 질의 최적화 모듈은 효율적인 연산 그래프를 만들어서 각각의 태스크로 분해하여 클러스터의 워크 노드 위에서 병렬 수행 가능하게 지원함. 모든 중간 결과는 메모리에 유지되며, 디스크 IO를 제한적으로 사용하므로 성능이 크게 향상됨 -
사용 편리성 스파크는 데이터 프레임이나 데이터세트 같은 고수준 데이터 추상화 계층 아래에서 유연한 분산 데이터세트(RDD)라 불리는 핵심적이면서도 단순한 논리 자료구조를 구축하여 단순성을 실현함. 연산의 종류로서 트랜스포매이션(transformation)과 액션(action)의 집합과 단순한 프로그래밍 모델을 제공함으로써 사용자들이 각자 편한 언어로 빅데이터 애플리케이션을 만들 수 있도록 함.
-
모듈성 스파크 연산은 다양한 타입의 워크로드에 적용 가능하며, 지원하는 모든 프로그래밍 언어로 표현할 수 있다. 하나의 스파크 애플리케이션을 작성함으로써 모든 것이 실행 가능해지며 전혀 다른 작업을 위해 별도의 엔진을 돌릴 필요도, API를 배울 필요도 없다.
-
확장성 스파크는 저장보다는 빠른 병렬 연산 엔진에 초점이 맞춰져 있다. 저장과 연산을 모두 포함하는 아파치 하둡과 달리 스파크는 이 둘을 분리했다. 스파크가 수많은 데이터 소스에서 데이터를 읽어 들여 메모리에서 처리 가능하다는 의미다. 스파크의
DataFrameReader
나DataFrameWriter
또한 아파치 카피카, 키네시스, 애저 저장소, AWS S3 등 더 많은 데이터 소스에 데이터를 읽어 들여서 논리적인 데이터 추상화 레벨에서 처리할 수 있도록 확장 가능하다.
단일화된 스택으로의 아파치 컴포넌트
다양한 워크로드를 위한 라이브러리로 스파크 SQL, MLlib, 정형화 스트리밍, GraphX를 제공한다. 이 각각의 컴포넌트는 스파크의 중심 장애 대응 엔진과는 별도로 존재하며, API를 써서 스파크 애플리케이션을 만들면 스파크 코어 엔진이 적절한 DAG로 변환해 실행한다. 그러므로 자바, R, 스칼라, SQL, 파이썬 중 어느 것으로 스파크 코드를 작성해 정형화 API를 사용하더라도 실제 코드는 고도로 경량화된 바이트코드로 변환되어 클러스터 전체에 나뉘어 워커 노드의 JVM에서 실행됨.
스파크 SQL
구조화된 데이터와 잘 동작한다. RDBMS 테이블이나 구조화된 데이터의 파일 포맷에서 데이터를 읽어 들일 수 있으며(CSV, txt, JSON, Avro, ORC, Parquet 등) 데이터로 스파크에서 영구적이거나 임시적인 테이블을 만들 수 있다.
아래 예제를 보자 아마존 S3에 저장된 JSON 파일을 읽어서 임시 테이블을 만들고 SQL 계통의 질의로 결과를 가져와 스파크 데이터 프레임으로 만든다.
// 스칼라 예제
// 아마존 S3 버킷에서 데이터를 읽어 들여 스파크 데이터 프레임으로 만든다.
spark.read.json("s3://apach_spark/data/committers.json").createOrReplaceTempView("committers")
// SQL 질의 실행 후 결과를 스파크 데이터 프레임으로 받음
val results = spark.sql("""SELECT name, org, module, release, num_commits
FROM committers WHERE module = 'mllib' AND num_commits > 10
ORDER BY num_commits DESC""")
이러한 코드를 파이썬, R, 자바로 사용 가능하다. 생성된 바이트 코드도 서로 동일하므로 같은 수준의 성능을 보인다.
스파크 MLlib
범용 머신러닝 알고리즘들을 포함한 라이브러리가 들어 있다. 스파크의 첫 릴리스 후 라이브러리 성능은 스파크 2.x대의 엔진 개선 후 비약적으로 상승했다. MLlib은 모델을 구축하기 위한 고수준 데이터 프레임 기반 API 기반으로 여러 인기 있는 머신러닝 알고리즘을 제공한다.
아파치 스파크 1.6을 시작으로 MLlib 프로젝트는 두 개의 패키로 분리됨(spark.mllib, spark.ml) 후자는 데이터 프레임 기반 API, 전자는 RDD 베이스이고 지금은 유지보수 모드임.
모든 새로운 기능은 spark.ml 밑으로 들어간다. 책에서 MLlib이라는 표현인 아파치 스파크 머신러닝 통합 라이브러리를 의미
이 API들은 특성들을 추출하고 변형하고 파이프라인을 구축하고(훈련이나 평가를 위해) 배포하는 동안 모델을 보존해 준다.
그 외의 추가적인 도구들은 일반적인 선형대수 연산을 사용하게 해 준다.
MLlib은 경사 하강법 최적화를 포함한 다른 저수준 ML 기능을 포함한다. 아래의 코드는 기본적인 연산을 요약한 것이다.(확장판은 10장, 11장 참고)
from pyspark.ml.classification import LogisticRegression
training = spark.read.csv("s3://...")
test = spark.read.csv("s3://.,..")
# 훈련 데이터 로드
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 모델 적합화
lrModel = lr.fit(training)
# 예측
lrModel.transform(test)
스파크 정형화 스트리밍
빅데이터 개발자들에게 아파치 카프카, 데이터 소스에서 들어오는 스트리밍 데이터든 정적 데이터든 실시간으로 연결하고 반응하기 위해 필요한 이 새로운 모델은 스트림을 연속적으로 증가하는 테이블이자, 끝에 계속 새로운 레코드가 추가되는 형태로 본다.
때문에 우리는 이를 정형화 테이블로 바라보고 정적인 테이블에 하듯이 그냥 쿼리만 날리면 된다.
다음 코드 예제는 전형적인 정형화 스트리밍 애플리케이션의 형태다. 로컬 호스트 소켓에서 데이터를 읽고 단어 세기 예제 결과를 아파치 카파카 토픽에 쓴다.
# 로컬 호스트에서 스트림을 읽어 들인다.
from pyspark.sql.functions import explode, split
lines = (spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load())
# 트렌스포메이션 수행
# 라인별로 읽어 단어별로 나눈다.
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# 단어 세기를 수행한다.
word_counts = words.groupBy("word").count
# 결과 스트림을 카프카에 쓴다.
query = (word_counts.writeStream.format("kafka").option("topic", "output"))
GraphX
그래프를 조작하고 그래프 병렬 연산을 수행하기 위한 라이브러리다. 분석, 연결 탐색 등의 표준적인 그래프 알고리즘을 제공하며 커뮤니티의 사용자들이 기여한 페이지랭크, 연결 컴포넌트, 삼각 집계 등의 알고리즘도 포함한다.
val graph = Graph(vertices, edges)
messages = spark.textFile("hdfs://...")
val graph2 = graph.joinVertices(messages){
(id, vertext, msg) => ...
}
아파치 스파크의 분산 실행
스파크의 분산 아키텍처 위에서 모든 컴포넌트들이 같이 동작하면서 서로 통신하는지, 어떤 식으로 배포가 가능한지 등을 알아둘 필요가 있다.
개별 컴포넌트들을 보고 아키텐처에 어떻게 맞춰지는지 살펴보자. 스파크 아키텍처를 넓은 범위에서 보면, 하나의 스파크 애플리케이션은 스파크 클러스트의 병렬 작업들을 조율하는 하나의 드라이버 프로그램에서 이루어진다.
드라이버는 SparkSession
객체를 통해 클러스트의 분산 컴포넌트들에 접근한다.
스파크 드라이버
SparkSession 객체를 초기화하는 책임을 가진 스파크 애플리케이션의 일부임
스파크 드라이버는 여러가지 역할을 수행함.
클러스터 매니저와 통신하며 스파크 이그제큐터들을 위해 필요한 자원을 요청하고 모든 스파크 작업을 DAG 연산 형태로 변환, 스케줄링하며 각 실행 단위를 태스크로 나누어 스파크 이그제큐터들에서 분배함.
자원이 일단 할당된다면 그 다음부터 드라이버는 이그제큐터와 직접 통신함.
- 이그제큐터 : 클러스터의 각 워커 노드에서 동작한다. 이그제큐터는 드라이버 프로그램과 통신하며 워커에서 태스크를 실행하는 역할을 한다. 대부분의 배포 모드에서 노드당 하나의 이그제큐터만이 실행된다.
SparkSession
스파크 2.0 부터 SparkSession은 모든 스파크 연산과 데이터에 대한 통합 연결 채널이 됨.
이전의 SparkContext, SQLContext, HiveContext, SparkConf, StreamingContext
등을 합쳐 놓았을 뿐 아니라 스파크 작업을 훨씬 간단하고 쉽게 만들어 준다.
모든 컨텍스트 객체들을 통합하긴 했지만 여전히 개별 컨텍스트 객체에 액세스하거나 관련 함수 사용은 가능했다. 이는 하위 호환성 유지를 위한 것이었으며, 그러므로 1.x에서 작성한 SparkContext나 SQLContext를 사용하는 코드도 여전히 동작이 가능했다.
import org.apache.spark.sql.SparkSession
//SparkSession 생성
val spark = SparkSession
.builder
.appName("LearnSpark")
.config("spark.sql.shuffle.partitions", 6)
.getOrCreate()
... 중략
// JSON 일기에 스파크 세션 사용
val people = spark.read.json("...")
... 중략
// SQL 실행에 스파크 세션 사용
val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")
클러스터 매니저
클러스터 매니저는 스파크 애플리케이션이 실행되는 클러스터에서 자원을 관리 및 할당하는 책임을 지님.
- 내장 단독 클러스터 매니저 (standalone)
- 아파치 하둡 얀(YARN)
- 아파치 메소스(Mesos)
- 쿠버네티스(Kubernetes)
배포 모드
스파크의 매력적인 특징은 스파크가 여러 다른 환경에서 다른 설정으로 돌아갈 수 있도록 다양한 배포 모드를 지원하는 것임.
클러스터 매니저는 어디서 실행되는지에 대한 자세한 정보 없이 돌아갈 수 있도록 추상화되어 있다.
때문에 쿠버네티스 같은 인기 있는 환경에서 배포 가능함
모드 | 스파크 드라이버 | 스파크 이그제큐터 | 클러스터 매니저 |
---|---|---|---|
로컬 | 랩톱이나 단일 서버 같은 머신에서 단일 JVM 위에서 실행 | 드라이버와 동일한 JVM 위에서 동작 | 동일한 호스트에서 실행 |
단독 | 클러스터의 아무 노드에서나 실행 가능 | 클러스트의 각 노드가 자체적인 이그제큐터 JVM 실행 | 클러스터의 아무 호스트에나 할당 가능 |
얀(클라이언트) | 클러스트 외부의 클라이언트에서 동작 | 얀의 노드매니저의 컨테이너 | 얀의 리소스 매니저가 얀의 애플리케이션 마스터와 연계하여 노드 매너지에 이그제큐터를 위한 컨테이너 할당 |
얀(클러스터) | 얀 애플리케이션 마스터에서 동작 | 얀(클러스터) 모드와 동일 | 얀(클러스터) 모드와 동일 |
쿠버네티스 | 쿠버네티스 팟에서 동작 | 각 워커가 자신의 팟 내에서 실행 | 쿠버네티스 마스터 |
분산 데이터와 파티션
실제 물리적인 데이터는 HDFS나 클라우드 저장소에 존재하는 파티션이 되어 저장소 전체에 분산된다. 데이터가 파티션으로 되어 물리적으로 분산되면서, 스파크는 각 파티션을 고수준에서 논리적인 데이터 추상화, 즉, 메모리의 데이터 프레임 객체로 바라본다.
이러한 파티셔닝은 병렬 처리를 가능하게 효과적인 돕는다. 청크, 파티션으로 분산해 저장하는 방식은 스파크 이그제큐터가 네트워크 사용을 최소화하며 가까이 있는 데이터만 처리할 수 있도록 해 준다.
즉, 이그제큐터가 쓰는 CPU 코어는 작업해야 하는 데이터의 파티션에 할당된다.
마치며
실제 책에서는 뒤에 내용이 더 있지만
Spark 프레임워크의 특징과 중요한 내용을 중심으로 소개했다.
Spark를 사용하기 앞서 우리가 왜 사용하고 사용하는 프레임워크가 어떤 특징이 있는지 알아야 적재적소에 써먹기 때문이다.
끝!