OK 대충 알겠는데 그래서 어떻게 사용해?
작성자 : 박정현
아파치 스파크 다운로드 및 시작
다운로드 과정은 각자 책을 참고해서 따라하길 바란다.
참고로 이번 포스트는 Google Colaboratory(코랩)
환경에서 spark 사용을 목적으로 작성했다.
책에 있는 코드와 다소 다를 수 있음을 참고하라.
참고로, 책에서는 shell에서 파이썬 스크립트를 실행하는 것을 가정하고 작성했다.
!pip install pyspark # 해당 코드는 코랩에서만 실행할 것.
# 코랩이 아닌 로컬 환경에서 pyspark를 사용할 경우 인터넷이나 책을 참고해서 설치하기 바란다.
스파크 애플리케이션 개념의 이해
-
애플리케이션
API를 사용해서 스파크 위에서 돌아가는 사용자 프로그램. 드라이버 프로그램과 클러스터의 실행기로 이루어진다. -
SparkSession
스파크 코어 기능들과 상호 작용할 수 있는 진입점을 제공하며 그 API로 프로그래밍을 할 수 있게 해주는 객체다. 스파크 shell에서 스파크 드라이버는 기본적으로 SparkSession을 제공하지만 코드에서는 본인이 생성해야함. -
Job
스파크 액션(save(), collect())에 대한 응답으로 생성되는 여러 태스크로 이루어진 병렬 연산 -
스테이지
각 job은 스테이지라 불리는 서로 의존성을 가지는 다수의 태스크 모음으로 나뉨. -
task
스파크 이그제큐터로 보내지는 작업 실행의 가장 기본적 단위
스파크 액션에 해당하는 함수 호출이 없으면 절대로 스파크 작업은 시작하지 않으니 참고하라
스파크 애플리케이션과 SparkSession
모든 스파크 애플리케이션의 핵심에는 스파크 드라이버 프로그램이 있다. 이 드라이버는 SparkSession 객체를 생성한다.
Spark shell을 사용해서 작업을 할 때는 드라이버가 포함되어 실행된다.(시작과 동시에 알아서 만들어짐)
스파크 잡
spark shell과 상호 작용하는 작업 동안, 드라이버는 스파크 애플리케이션을 하나 이상의 스파크 잡으로 변환한다.
그리고 각 잡은 DAG로 변환된다. 본질적으로 이것이 스파크의 실행 계획이 되며 DAG그래프에서 각 노드가 하나 이상의 스파크 스테이지에 해당한다.
드라이버는 여러개의 잡을 생성할 수 있음
스파크 스테이지
특정 작업이 연속적으로 또는 병렬적으로 수행되는지에 맞춰 스테이지에 해당하는 DAG 노드가 생성된다. 모든 스파크 연산이 하나의 스테이지 안에서 실행될 수는 없으므로 여러 스테이지로 나뉘어야 한다.
참고로 스파크 연산은 DAG 그래프로 나타내 진다.
가끔 스파크 이그제큐터끼리 데이터 전송이 이루어지는 연산 범위 경계 위에서 스테이지가 결정되기도
즉, 스파크 잡은 하나 이상의 스테이지를 생성할 수 있다
스파크 태스크
각 스테이지는 최소 실행 단위이며 스파크 이그제큐터들 위에서 연합 실행되는 스파크 태스크(task)로 이루어진다.
각 태스크는 개별 CPU 코어에 할당되고 데이터의 개별 파티션을 갖고 작업한다.
그런 식으로, 16코어 이그제큐터라면 16개 이상의 파티션을 갖는 16개 이상의 태스크를 할당받아 작업하게 되며 이런 식으로 철저한 병렬 처리가 이루어지는 것이다.
스파크 스테이지는 하나 이상의 태스크를 생성
트랜스포메이션, 액션, 지연 평가
분산 데이터의 스파크 연산은 트랜스포메이션(transformation)과 액션(action)으로 구분된다. 트랜스포메이션은 이미 불변성의 특징을 가진 원본 데이터를 수정하지 않고 하나의 스파크 데이터 프레임을 새로운 데이터 프레임으로 그 이름처럼 변형 한다.
즉, select()
혹은 filter()
같은 연산은 원본 데이터 프레임을 수정하지 않으며, 새로운 데이터 프레임 결과를 만들어 반환한다.
따라서 트랜스포메이션 즉, 변형이라는 표현을 사용
모든 트랜스포메이션은 뒤늦게 평가된다. 다시 말해 그 결과는 즉시 계산되는 것이 아니라 리니지(lineage)
라 불리는 형태로 기록된다.
리니지는 실행 계획에서 후반쯤에 스파크가 확실한 트랜스포메이션들끼리 재배열하거나 합치거나 해서 더 효율적으로 수행할 수 있도록 최적화 한다.
나중에 코드를 보면 알겠지만 액션(코드에서 show() 함수가 보이기 전까지 결과를 나타내지 않음)
지연 평가는 액션이 실행되는 시점이나 데이터에 실제 접근하는 시점(I/O)까지 실제 실행을 미루는 스파크의 전략임
하나의 액션은 모든 기록된 트랜스포메이션의 지연 연산을 발동시킴.
데이터 프레임의 트랜스포메이션 연산을 100번해도 액션이 없으면 결과를 안보임
지연 평가는 스파크가 사용자의 연계된 트랜스포메이션들을 살펴봄으로써 쿼리 최적화를 가능하게 하는 반면, 리니지와 데이터 불변성은 장애에 대한 데이터 내구성을 제공한다.
데이터 내구성 근거 : 트랜스포메이션은 데이터 프레임을 변형해서 새로 생산한다고 했음. 따라서 언제든 재실행을 통해 원래 상태를 다시 만들어 낼 수 있음
대표적인 트랜스포메이션 연산
orderBy()
groupBy()
filter()
select()
join()
대표적인 액션 연산
show()
take()
count()
collect()
save()
액션과 트랜스포메이션들은 스파크 쿼리 계획이 만들어지는 데에 도움을 주며 다음 장에서 다룰 것이다.
하나의 쿼리 계획 안의 어떤 것도 액션이 호출되기 전에는 실행되지 않는다.
아래 예제를 보면 2개의 트랜스포메이션 read()
, filter()
과 하나의 액션 show()
를 보게 될 것이다.
액션은 쿼리 실행 계획의 일부로 기록된(리니지에) 모든 트랜스포메이션들의 실행을 시작한다.
from pyspark.sql import SparkSession
# 스파크 세션 생성
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# 데이터 읽음(트랜스포메이션 read)
strings = spark.read.csv("mnm_dataset.csv", header=True)
# 트랜스포메이션 select
filtered = strings.select("Color")
# 액션을 실행하지 않은 경우
filtered
DataFrame[Color: string]
from pyspark.sql import SparkSession
# 스파크 세션 생성
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# 데이터 읽음(트랜스포메이션 read)
strings = spark.read.csv("mnm_dataset.csv", header=True)
# 트랜스포메이션 select
filtered = strings.select("Color")
# 액션 실행
filtered.show()
+------+
| Color|
+------+
| Red|
| Blue|
| Blue|
| Blue|
|Yellow|
| Blue|
|Yellow|
| Green|
| Green|
| Green|
| Green|
| Brown|
|Yellow|
| Blue|
| Brown|
| Red|
|Orange|
| Blue|
| Brown|
| Red|
+------+
only showing top 20 rows
좁은/넓은 트랜스포메이션
트렌스포메이션은 스파크가 지연 평가하는 연산 종류다. 지연 연산 개념의 큰 이득은 스파크가 연산 쿼리를 분석하고 어디를 최적화할지 알 수 있다는 점이다.
예를 들어 Join이나 Pipelining이 될 수도 있고 연산들을 한 스테이지(최소 실행 단위)로 합치거나 반대로 어떤 연산이 셔플이나 클러스터 데이터 교환이 필요한지 파악해 조치할 수 있다.
트랜스포메이션은 좁은(narrow) 의존성, 넓은(wide) 의존성으로 분류 할 수 있다. 하나의 입력 파티션을 연산하여 하나의 결과 파티션을 내놓는 트랜스포메이션은 어느 것이든 좁은 트랜스포메이션이다.
예를 들어 앞의 예제 filter()
와 트랜스포메이션 연산인 contains()
는 하나의 파티션을 처리하여 데이터 교환 없이 결과 파티션을 생성한다.
이를 좁은 트랜스포메이션이라 부른다.
반대로 groupBy()
, orderBy()
는 스파크가 넓은 트랜스포메이션을 수행하게 하는데, 이는 다른 파티션으로부터 데이터를 읽어 들여서 합치고 디스크에 쓰는 등의 일을 하기 때문이다.
너무 어려워 하지말고 filter나 contains는 데이터 프레임에서 조사하는 것인 반면
groupBy, orderBy는 데이터프레임에서 새로운 조각들을 구성해야하는 연산이니 넓다고 표현하는 것임
스파크 UI
아마 가장 좋아할 것 같은데
스프카 애플리케이션을 UI를 통해 살펴볼 수 있게 해준다. 다양한 레벨(잡, 스테이지, 태스크)에서 확인 가능하다.
스파크가 배포된 방식에 맞춰 드라이버가 웹 UI를 띄우게 되며 기본적으로 4040 포트를 사용함.
스파크 UI에서 볼 수 있는 것은
- RDD 크기와 메모리 사용 요약
- 환경 정보
- 실행 중인 이그제큐터 정보
- 모든 스파크 SQL 쿼리
뭐 자세한 내용은 책을 통해 참고 바란다.
첫 단독 애플리케이션
서론에서도 말했지만 책에서는 databricks라는 사이트를 활용해서 실습하거나 shell을 통해서 실습한다.
나는 여러분의 편의를 위해(?) 코랩에 맞춰서 코드를 변형해서 작성하고 있다.
그래서 지금 적을 내용은 책의 일부를 각색한 것이니 오해말길.
쿠키 몬스터를 위한 M&M 세기라는 주제로 쓸모없는 짓을 하긴할 것임
어디까지 예제니까 처음엔 그냥 우와 하셈
우리가 알고 싶은 분석 결과는 미국의 각 주(state)별로 학생들이 어떤 색깔의 M&M을 좋아하는지 알려줄 것이다.
아래 코드를 참고하라
# 깃허브 데이터 불러오기
import requests
import os
def download_csv_from_github_raw(raw_url, save_path):
try:
response = requests.get(raw_url)
response.raise_for_status() # Check for any request errors
with open(save_path, 'wb') as file:
file.write(response.content)
print(f"CSV file downloaded successfully and saved at '{save_path}'.")
except requests.exceptions.HTTPError as http_err:
print(f"HTTP error occurred: {http_err}")
except Exception as err:
print(f"An error occurred: {err}")
# 사용 예시
raw_url = 'https://raw.githubusercontent.com/DAU-BigDataTeams/LearningSparkV2/master/chapter2/py/src/data/mnm_dataset.csv' # GitHub CSV 파일의 raw 주소
save_path = 'mnm_dataset.csv' # 저장할 파일 경로 및 이름을 지정
download_csv_from_github_raw(raw_url, save_path)
# 참고로 pandas를 사용하는 경우
# pd.read.csv(url) 바로 사용하면 끝
CSV file downloaded successfully and saved at 'mnm_dataset.csv'.
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("PythonMnMCount")
.getOrCreate())
# 파일 경로 지정
mnm_file = "mnm_dataset.csv"
# 스키마 추론과 쉼표로 구분된 칼럼 이름이 제공되는 헤더가 있음(format, option 2, 3)
# 설정을 마치면 CSV 포맷으로 파일을 읽어 데이터 프레임에 저장함.
mnm_df = (spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(mnm_file))
mnm_df.show(n=5, truncate=False)
# 데이터 프레임 고수준 API를 사용하고 RDD는 전혀 쓰지 않는다는 점에 주목하라
# 일부 스파크 함수들은 동일한 객체를 되돌려 주므로 함수 호출을 체이닝할 수 있다.
# 1. 데이터 프레임에서 "State", "Color", "Count" 필드를 읽는ㄷ나.
# 2. 각 주별로, 색깔별로 그룹화하기 원하므로 groupBy()를 사용한다.
# 3. 그룹화된 주/색깔별로 모든 색깔별 집계를 한다.
# 4. 역순으로 orderBy() 한다.
count_mnm_df = (mnm_df.select("State", "Color", "Count") # 지금 사용하는 API가 고수준 의미
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# 모든 주와 색깔별로 결과를 보여준다
# show()는 액션이므로 위의 쿼리 내용들이 시작되게 된다는 점에 주목
count_mnm_df.show(n=60, truncate=False)
print("Total Rows = %d" % (count_mnm_df.count()))
# 위 코드는 모든 주에 대한 집계를 계산했지만
# 만약 특정 주에 대한 데이터, 예를 들면 CA에 대해 보기를 원한다면?
# 1. 데이터 프레임에서 모든 줄을 읽는다.
# 2. CA주에 대한 것만 걸러낸다.
# 3. 위에서 했던 것처럼 주와 색깔별롸 groupBy() 한다
# 4. 각 색깔별로 카운트를 합친다.
# 5. orderBy()로 역순 정렬
# 필터링을 통해 캘리포니아의 결과만 찾아낸다.
ca_count_mnm_df = (mnm_df.select("*")
.where(mnm_df.State == 'CA')
.groupBy("State", "Color")
.sum("Count")
.orderBy("sum(Count)", ascending=False))
# show the resulting aggregation for California
ca_count_mnm_df.show(n=10, truncate=False)
spark.stop()
+-----+------+-----+
|State|Color |Count|
+-----+------+-----+
|TX |Red |20 |
|NV |Blue |66 |
|CO |Blue |79 |
|OR |Blue |71 |
|WA |Yellow|93 |
+-----+------+-----+
only showing top 5 rows
+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA |Yellow|100956 |
|WA |Green |96486 |
|CA |Brown |95762 |
|TX |Green |95753 |
|TX |Red |95404 |
|CO |Yellow|95038 |
|NM |Red |94699 |
|OR |Orange|94514 |
|WY |Green |94339 |
|NV |Orange|93929 |
|TX |Yellow|93819 |
|CO |Green |93724 |
|CO |Brown |93692 |
|CA |Green |93505 |
|NM |Brown |93447 |
|CO |Blue |93412 |
|WA |Red |93332 |
|WA |Brown |93082 |
|WA |Yellow|92920 |
|NM |Yellow|92747 |
|NV |Brown |92478 |
|TX |Orange|92315 |
|AZ |Brown |92287 |
|AZ |Green |91882 |
|WY |Red |91768 |
|AZ |Orange|91684 |
|CA |Red |91527 |
|WA |Orange|91521 |
|NV |Yellow|91390 |
|UT |Orange|91341 |
|NV |Green |91331 |
|NM |Orange|91251 |
|NM |Green |91160 |
|WY |Blue |91002 |
|UT |Red |90995 |
|CO |Orange|90971 |
|AZ |Yellow|90946 |
|TX |Brown |90736 |
|OR |Blue |90526 |
|CA |Orange|90311 |
|OR |Red |90286 |
|NM |Blue |90150 |
|AZ |Red |90042 |
|NV |Blue |90003 |
|UT |Blue |89977 |
|AZ |Blue |89971 |
|WA |Blue |89886 |
|OR |Green |89578 |
|CO |Red |89465 |
|NV |Red |89346 |
|UT |Yellow|89264 |
|OR |Brown |89136 |
|CA |Blue |89123 |
|UT |Brown |88973 |
|TX |Blue |88466 |
|UT |Green |88392 |
|OR |Yellow|88129 |
|WY |Orange|87956 |
|WY |Yellow|87800 |
|WY |Brown |86110 |
+-----+------+----------+
Total Rows = 60
+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA |Yellow|100956 |
|CA |Brown |95762 |
|CA |Green |93505 |
|CA |Red |91527 |
|CA |Orange|90311 |
|CA |Blue |89123 |
+-----+------+----------+
애플리케이션 빌드하기
이제 sbt(Scala Build Tool)로 스칼라 스파크 프로그램을 어떻게 빌드하는지 보자.
참고로 파이썬은 컴파일이 없어서 pyc라는 파일을 만들어 냄
build.sbt는 Makefile 처럼 스칼라 컴파일러가 사용자의 스칼라 관련 작업, 예를 들어면 jar 파일 생성, 패키징, 의존성 관리, 디렉터리 지정 등을 어떻게 처리할지 기술해 놓은 명세 파일이다.
여러분의 M&M 코드를 위해서는 다음의 간단한 sbt 파일을 쓸 것이다.
파이썬으로 작업하는 사람은 넘어가
요약
간단한(것 같은) 3가지 단계를 거쳐 스파크를 시작하기 위해 필요한 과정들을 마쳤다.
트랜스포메이션과 액션의 특징을 알고 실제로 사용해보며 경험했다.
끝!