판다스 UDF와 스파크 SQL에 대하여 알아보자!
작성자 : 박민서
스파크 SQL과 데이터 프레임: 외부 데이터 소스와 소통하기
- 스파크 SQL과 데이터 프레임: 외부 데이터 소스와 소통하기
- 스파크 SQL과 아파치 하이브
- 스파크 SQL 셸; 비라인 및 태블로로 쿼리하기
- 외부 데이터 소스
- 일반적인 데이터 프레임 및 스파크 SQL 작업
- 끝
SQL를 통해 사용자 정의 함수, 외부 데이터 원본과 연결하는 방법, 고차 함수와 일반적인 관계 연산자를 사용하여 작업하는 방법에 대해 알아보자.
스파크 SQL과 아파치 하이브
스파크의 함수형 프로그래밍 API를 통합하는 아파치 스파크의 기본 구성 요소인 스파크 SQL에 대하여 알아보자.
스파크 SQL은 샤크 Shark 이전에 했던 작업을 기원으로 한다. 샤크는 하이브 코드베이스를 기반으로 구축되었으며 하둡 시스템 최초의 대화형 SQL 쿼리 엔진 중 하나가 되었으며 빠르고 확장이 가능한 두 장점을 가질 수 있다는 것을 보여줬다는 데에 의의가 있다.
사용자 정의 함수
사용자 정의 함수 user-defined function, UDF 란 데이터 엔지니어와 데이터 과학자가 자신의 기능을 정의할 수 있는 유연성을 제공하는 함수이다.
스파크 SQL UDF
사용자만의 고유한 파이스파크 또는 스칼라 UDF를 생성하면 사용자도 스파크 SQL 안에서 이를 사용할 수 있다.
UDF는 세션별로 작동하며 기본 메타 스토어에서는 유지되지 않는다.
// 스칼라 예제
// 큐브 함수 생성
val cubed = (s: Long) => s * s * s
// UDF로 등록
spark.udf.register("cubed", cubed)
// 임시 뷰 생성
spark.range(1, 9).createOrReplaceTempView("udf_test")
# 파이썬 예제
from pyspark.sql.types import LongType
# 큐브 함수 생성
def cubed(s):
return s * s * s
# UDF로 등록
spark.udf.register("cubed", cubed, LongType())
# 임시 뷰 생성
spark.range(1, 9).createOrReplace("udf_test")
스파크 SQL을 사용하여 cubed() 함수 실행 가능하다.
// 스칼라 및 파이썬 예제
// 큐브 UDF를 사용하여 쿼리
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()
스파크 SQL에서 평가 순서 및 null 검사
스파크 SQL은 하위 표현식의 평가 순서를 보장하지 않는다.
적절한 null 검사를 수행하기 위해서는
1 UDF 자체가 null을 인식하도록 만들고 UDF 내부에서 null 검사를 수행
2 IF 또는 CASE WHEN 식을 사용하여 null 검사를 수행하고 조건 분기에서 UDF를 호출
판다스 UDF로 파이스파크 UDF 속도 향상 및 배포
파이스파크 UDF가 JVM과 파이썬 사이의 데이터 이동을 필요로 해서 비용이 많이 들었기 때문에 스칼라 UDF보다 성능이 느리다.
이 문제를 해결하기 위해 판다스 UDF(벡터화된 UDF) 가 아파치 스파크 2.3의 일부로 도입되었다.
아파치 애로우를 사용하여 데이터를 전송하고 판다스는 해당 데이터로 작업을 한다.
pandas_udf 키워드를 데코레이터로 사용하여 판다스 UDF를 정의하거나 함수 자체를 래핑할 수 있다.
아파치 애로우 형식에 포함된 데이터라면 파이썬 프로세스에서 사용 가능한 형식이므로 데이터를 직렬화나 피클할 필요가 없다.
판다스 시리즈 또는 데이터 프레임에서 작업한다.
판다스 UDF
pandas.Series, pandas.DataFrame, Tuple 및 Iterator와 같은 파이썬 유형 힌트로 판다스 UDF 유형을 유추한다.
판다스 UDF 유형을 수동으로 정의하고 지정해야 했으나, Series와 Series, Series 반복자와 Series 반복자, 다중 Series 반복자와 Series 반복자, Series와 Scala(단일값)를 파이썬 유형 힌트로 지원한다.
판다스 함수 API
입력과 출력이 모두 판다스 인스턴스인 파이스파크 데이터 프레임에 로컬 파이썬 함수를 직접 적용할 수 있다. 그룹화된 맵, 맵, 공동 그룹화된 맵을 지원한다.
큐브 연산을 수행하는 cubed() 함수
판다스 UDF를 만들기 위해 추가적인 cubed_udf = pandas_udf() 호출이 있는 일반적인 판다스 함수이다.
로컬 함수와 다르게 벡터화된 UDF를 사용할 때 스파크 작업이 실행된다.
pandas_udf 함수의 단계를 더 확실하게 확인할 수 있다.
parallelize()로 시작하여 로컬 데이터(애로우 바이너리) 배치를 이그제규터로 보내고 애로우 바이너리 배치를 스파크의 내부 데이터 형식으로 변환하는 mapPartitions()를 호출하여 스파크 작업자에게 배포 가능하다.
성능의 기본 단계를 나타내는 여러 WholeStageCodegen 단계가 존재한다.
판다스 UDF가 실행되고 있음을 식별하는 것은 ArrowEvalPython 단계이다.
스파크 SQL 셸; 비라인 및 태블로로 쿼리하기
아파치 스파크를 쿼리하는 다양한 메커니즘
-스파크 SQL 셸
-비라인 Beeline CLI 유틸리티
-태블로 및 파워 BI
스파크 셸 사용하기
spark-sql CLI는 로컬 모드에서 하이브 매타스토어 서비스와 통신하는 대신 쓰리프트 Thrift JDBC/ODBC 서버 (일명 스파크 쓰리프트 서버 또는 STS)와는 소통하지 않는다. STS를 사용하면 JDBC/ODBC 클라이언트가 아파치 스파크에서 JDBC/ODBC 프로토콜을 통해 SQL 쿼리를 실행할 수 있다.
스파크 SQL CLI 시작하기
// '$SPARK_HOME' 폴더에서 다음 명령을 실행한다.
./bin/spark-sql
위 명령을 실행 한 후에, 스파크 SQL 쿼리를 대화 형식으로 수행 가능하다.
테이블 만들기
새롭고 영구적인 스파크 SQL 테이블을 생성하는 명령어
spark-sql> CREATE TABLE people (mame STRING, age INT);
테이블에 데이터 삽입하기
INSERT INTO people SELECT name, age FROM ...
기존 테이블이나 파일에서 데이터를 로드하는 데 의존하지 않으므로 INSERT… VALUES문을 사용
spark-sql> INSERT INTO people VALUSE ("DAU", 23);
스파크 SQL 쿼리 실행하기
메타스토어에 있는 테이블 살펴보기
spark-sql> SHOW TABLEs;
조건에 따라 테이블 내의 아이템 보기
spark-sql> SELECT * FROM people WHERE 조건;
비라인 작업
비라인은 SQLLine CLI를 기반으로 하는 JDBC 클라이언트로 스파크 스리프트 서버에 대해 스파크 SQL 쿼리를 실행할 수 있다.
쓰리프트 서버 시작하기
‘$SPARK_HOME’ 폴더에서 아래 명령어 실행
.sbin/start-thriftserver.sh
스파크 드라이버와 워커를 아직 시작하지 않은 경우 start thriftserver.sh 실행 전에 ./sbin/start-all.sh를 먼저 실행하자.
비라인을 통해 쓰리프트 서버에 연결하기
비라인을 사용하여 쓰리프트 JDBC/ODBC 서버를 테스트하려면 아래 명령을 실행
./bin/beeline
비라인을 구성하여 로컬 쓰리프트 서버에 연결
!connect jdbc:hive2://localhost:10000
비라인의 default 상태는 비보안 모드 non-secure mode 이다. 따라서 사용자 이름은 로그인 계정(예: user@learningspark.org) 비밀번호는 비어있다.
쓰리프트 서버 중지하기
./sbin/stop-thriftserver.sh
태블로로 작업하기
쓰리프트 JDBC/ODBC 서버를 통해 선호하는 BI 도구를 스파크 SQL에 연결할 수 있다.
쓰리프트 서버 시작하기
‘$SPARK_HOME’ 폴더에서 아래 명령 실행
./sbin/start-thriftserver.sh
태블로 시작하기
기본 패널에 나타나는 목록에서 스파크 SQL을 선택하고, 로컬 아파치 스파크 인스턴스에 연결할 때는 매개 변수와 함께 비보안 사용자 이름 인증 모드를 사용할 수 있다.
서버: localhost
포트: 10000 (기본값)
유형: SparkThriftServer(기본값)
인증: 사용자 이름
사용자 이름: 로그인
SSL 필요: 선택하지 않음
스키마 선택 드롭다운 메뉴에서 ‘기본값’을 선택하고, 테이블 이름을 입력한다.
테이블명으로 people을 입력한 다음 왼쪽에서 기본 대화 상자로 테이블을 끌어다 놓는다.
지금 업데이트를 클릭하면 태블로에서 스파크 SQL 데이터 원본에 쿼리를 시작한다.
스파크 데이터 원본이나 조인 테이블을 통하여 쿼리를 실행 가능하다.
쓰리프트 서버 중지
./sbin/stop-thriftserver.sh
외부 데이터 소스
JDBC 및 SQL 데이터베이스에 대해 알아보고, 스파크 SQL을 사용하여 외부 데이터 소스에 연결하는 방법을 알아보자.
JDBC 및 SQL 데이터베이스
결과를 데이터 프레임으로 반환할 때 이러한 데이터 소스 쿼리를 단순화하므로 스파크 SQL의 모든 이점(성능 및 다른 데이터 소스와 조인 가능한 기능 포함)을 제공한다.
JDBC 데이터 소스에 대한 JDBC 드라이버를 지정해야 시작 가능하고 스파크 클래스 경로에 있어야 한다. ‘$SPARK_HOME’ 폴더에서 실행해야 한다.
./bin/spark-shell --driver-class-path $database.jar --jars $database.jar
데이터 소스 API를 사용하여 데이터 프레임 또는 스파크 SQL 임시뷰로 로드할 수 있다. 또한 데이터 소스 옵션에서 JDBC 연결 속성을 지정할 수 있다.
파티셔닝의 중요성
스파크 SQL과 JDBC 외부 소스 간에 많은 양의 데이터를 전송할 때 데이터 소스를 분할하는 것이 중요하다.
모든 데이터가 하나의 드라이버 연결을 통해 처리되므로 추출 성능을 포화 상태로 만들고 성능을 크게 저하시킬 수 있을 뿐만 아니라 소스 시스템의 리소스를 포화 상태로 만들 수 있다.
예를 하나 들어보자.
-numPartitions:10
-lowerBound:1000
-upperBound:10000
파티션 크기는 1000이 되고 10개의 파티션이 생성된다.
numPartitions
좋은 시작점은 스파크 워커 수의 배수를 사용하는 것이다.
스파크 워커 노드가 4개 있는 경우 파티션 4개 또는 8개로 시작할 수 있다.
소스 시스템이 읽기 요청을 얼마나 잘 처리할 수 있는지도 확인해야 한다. 처리 윈도우가 있는 시스템의 경우 소스 시스템에 대한 동시 요청 수를 최대화 가능하다. 처리 윈도우가 없는 시스템의 경우에는 소스 시스템의 포화를 방지하기 위해 동시 요청 수를 줄여야 한다.
최소 및 최대 partitionColumn의 실제 값을 기준으로 lowerBound 및 upperBound를 기반으로 계산한다.
데이터 스큐를 방지하기 위해 균일하게 분산될 수 있는 partitionColumn을 선택해야 한다.
이러한 경우 기존 partitionColumn 대신 다른 partitionColumn을 사용하거나 가능한 경우 파티션을 더 균등하게 분산하기 위해 새 항목을 생성하도록 한다.
PostgreSQL
메이븐에서 JDBC jar을 빌드하거나 다운로드한 후에 클래스 경로에 추가한다. 해당 jar을 지정하여 스파크 셸(spark-shell 또는 pyspark)을 시작한다.
bin/spark-shell --jars postgresql-42.2.6.jar
스파크 SQL 데이터 소스 API 및 JDBC를 사용하여 PostgreSQL 데이터베이스에서 로드하고 저장하는 방법을 살펴보자.
// 스칼라 예제
// 읽기 방법 1: 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
val jdbcDF1 = spark
.read
.format("jdbc")
.option("url", "jdbc:postgresql:[DBSERVER]")
.option("dbtable", "[SCHEMA].[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load()
// 읽기 방법 2: jdbc 함수를 사용하여 JDBC 소스로부터 데이터를 로드
// 연결 속성 생성
import java.util.Properties
val cxnProp = new Properties()
cxnProp.put("user", "[USERNAME]")
cxnProp.put("password", "[PASSWORD]")
// 연결 속성을 사용하여 데이터를 로드
val jdbcDF2 = spark
.read
.jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)
// 쓰기 방법 1: 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
jdbcDF1
.write
.format("jdbc")
.option("url", "jdbc:postgresql:[DBSERVER]")
.option("dbtable", "[SCHEMA].[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save()
// 쓰기 방법 2: jdbc 함수를 사용하여 JDBC 소스에 데이터를 저장
jdbcDF2.write
.jdbc(s"jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)
파이스파크로 하는 방법
# 파이썬 예제
# 읽기 방법 1: 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF1 = (spark
.read
.format("jdbc")
.option("url", "jdbc:postgresql://[DBSERVER]")
.option("dbtable", "[SCHEMA].[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load())
# 읽기 방법 2: jdbc 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF2 = (spark
.read
.jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]", properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))
# 쓰기 방법 1: 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF1
.write
.format("jdbc")
.option("url", "jdbc:postgresql://[DBSERVER]")
.option("dbtable", "[SCHEMA].[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save())
# 쓰기 방법 2: jdbc 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF2
.write
.jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))
MySQL
MySQL DB에 연결하려면 메이븐 또는 MySQL에서 JDBC jar을 빌드하거나 다운로드한 후에 클래스 경로에 추가하고, jar을 지정하여 스파크 셸 spark-shell 또는 pyspark를 시작한다.
bin/spark-shell --jars mysql -connector-java_8.0.16-bin.jar
스칼라에서 스파크 SQL 데이터 소스 API 및 JDBC를 사용하여 MySQL 데이터베이스에서 데이터를 로드하고 저장하는 방법
// 스칼라 예제
// 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
val jdbcDF = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
.option("driver", "com.mysql.jdbcDriver")
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load()
// 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
jdbcDF
.write
.format("jdbc")
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
.option("driver", "com.mysql.jdbc.Driver"
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save())
파이스파크
# 파이썬 예제
# 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF = (spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
.option("driver", "com.mysql.jdbcDriver")
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load())
# 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF
.write
.format("jdbc")
.option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
.option("driver", "com.mysql.jdbcDriver")
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save())
애저 코스모스 DB와 MS SQL 서버는 교재를 참고하여 보길 바라요 ㅎ
데이터 프레임 및 스파크 SQL의 고차 함수
단순한 데이터 유형의 결합인 복잡한 데이터 유형을 다루는 조작하는 두 가지 방법에 대해 알아보자.
-중첩된 구조를 개별 행으로 분해하고 일부 함수를 적용한 다음 중첩된 구조를 다시 만드는 방법
-사용자 정의 함수 구축
방법 1: 분해 및 수집
중첩된 SQL문에서 explode(values)를 사용하여 values 내의 각 요소에 대한 새로운 행(id 포함)을 만든다.
SELECT id, collect_list(value + 1) AS values
FROM (SELECT id, EXPLODE(values) AS value
FROM table) x
GROUP BY id
collect_list()가 중복된 개체 목록을 반환하지만 GROUP BY 문에는 셔플 작업이 필요하다. 즉, collect_list()의 반환값이 원래 배열의 순서와 반드시 동일하지는 않다.
방법 2: 사용자 정의 함수
위와 동일한 작업을 수행하기 위해서 map()을 사용하여 각 요소(값)를 반복하고 행하려는 작업을 수행하는 UDF를 생성할 수 있다.
// 스칼라 예제
def addOne(values: Seq[Int]): Seq[Int] = {
values.map(value => value + 1)
}
val plusOneInt = spark.udf.register("plusOneInt", addOne(_:Seq[Int]): Seq[Int])
Spark SQL에서 위 UDF를 아래처럼 사용할 수 있다.
spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show()
위 방법은 정렬 문제가 없기 때문에 explode() 및 collect_list()를 사용하는 것보다 낫지만 직렬화 및 역직렬화 프로세스 자체는 비용이 많이 들 수 있다.
collect_list()를 사용하면 실행자들이 대용량 데이터에 대한 메모리 부족 문제를 경험할 수 있다.
복잡한 데이터 유형을 위한 내장 함수
잠재적으로 비용이 많이 드는 기술을 사용하는 대신 복잡한 데이터 유형을 위한 내장 함수를 사용할 수 있다.
고차 함수
내장 함수 외에도 익명 람다 함수를 인수로 사용하는 고차 함수도 있다.
transform() 함수
transform(Values, value -> lambda expression)
transform() 함수는 배열 values 과 익명 함수(람다 표현식)를 입력으로 사용하며, 익명 함수를 적용한 후에 결과를 출력 배열에 할당함으로써 새로운 배열을 투명하게 생성한다.
filter() 함수
filter(array<T>, function<T, Boolean>): array<T>
filter() 함수는 입력한 배열의 요소 중 bool 함수가 참인 요소만으로 구성된 배열을 생성한다.
exists() 함수
exists(array<T>, function<T, V, Boolean>): Boolean
exists() 함수는 입력한 배열의 요소 중 bool 함수를 만족시키는 것이 존재하면 참을 반환한다.
reduce() 함수
reduce(array<T>, B, function<B, T, B>, function<B, R>)
reduce() 함수는 function<B, T, B>를 사용하여 요소를 버퍼 B에 병합하고 최종 버퍼에 마무리 function<B, R>을 적용하여 배열의 요소를 단일값으로 줄인다.
일반적인 데이터 프레임 및 스파크 SQL 작업
Union
동일한 스키마를 가진 두 개의 서로 다른 데이터 프레임을 함께 결합하는 것, union() 함수를 사용하여 수행 가능하다.
Join
두 개의 데이터 프레임을 함께 조인하는 Join 함수의 default는 inner join이며, 옵션으로는 inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi 및 left_anti 등이 있다.
윈도우
윈도우 함수는 윈도우(입력 행의 범위) 행의 값을 사용하여 다른 행의 형태로 값 집합을 반환한다. 모든 입력 행에 대해 단일값을 반환한다.
아래는 윈도우 함수의 종류이다.
수정
데이터 프레임 자체는 변경할 수 없지만, 다른 열을 사용하여 새롭고 다른 데이터 프레임을 만드는 작업을 통해 수정할 수 있다.
열 삭제
열을 삭제하려면 drop() 함수를 사용해야 한다.
칼럼명 바꾸기
withColumnRenamed() 함수를 사용하여 칼럼명을 바꿀 수 있다.
피벗
로우와 칼럼을 바꾸어야 하는 경우 사용한다.
끝
스파크 SQL이 어떻게 다른 요소들과 상호작용하는지, UDF를 포함한 사용자 정의 함수 생성에 대해 알아보았다.