데이터 프레임 API와 스파크 SQL 관계 따지기

작성자 : 박정현

스파크 SQL과 데이터 프레임 : 내장 데이터 소스 소개


스파크 SQL이 외부 구성요소들과 어떻게 소통하는지에 대해 알아보도록 하자.

스파크 SQL은 아래 특징을 가진다.

  1. 상위 수준의 정형화 API가 엔진으로 제공된다.
  2. 다양한 정형 데이터를 읽거나 쓸 수 있다.(JSON, HIVE Table, Parquet, …)
  3. Tableau, Talend 같은 외부 인텔리전스 시스템과 MySQL 등 RDBMS의 데이터를 JDBC / ODBC 커넥터를 이용해 사용 가능
  4. 스파크 애플리케이션에서 데이터베이스 안에 테이블 또는 뷰로 저장되어 있는 정형 데이터와 소통할 수 있도록 프로그래밍 인터페이스 제공
  5. SQL 쿼리를 정형 데이터에 대해 실행할 수 있는 대화형 셸을 제공.
  6. ANSI SQL: 2003 호환 명령 및 HiveQL 지원

JDBC / ODBC : SQL과 연결을 가능하도록 지원하는 프레임워크라고 생각하셈
특히 JDBC는 Java하면서 배웠을 것임

1

스파크 SQL 커넥터와 데이터 소스

스파크 애플리케이션에서 스파크 SQL 사용하기


SparkSession은 정형화 API로 스파크를 프로그래밍 하기 위한 통합된 진입점을 제공함. 따라서 스파크의 기능에 접근할 수 있는 SparkSession을 사용하면 쉽게 클래스를 가져오고 코드에서 인스턴스를 생성할 수 있다.

이를 통해 SQL 쿼리를 수행하려면 spark.sql(…)등 과 같은 sql()함수를 사용함.

이런 쿼리는 원하는 경우 추가적인 스파크 작업이 수행가능한 데이터 프레임으로 반환된다.

기본 쿼리 예제


날짜, 지연, 거리, 출발, 목적지 등 미국 항공편에 대한 데이터가 포함된 항공사 정시 운항 능력 및 비행 지연 원인 데이터세트에 대한 몇 가지 쿼리로 실습한다.

!wget https://raw.githubusercontent.com/DAU-BigDataTeams/LearningSparkV2/master/databricks-datasets/learning-spark-v2/flights/departuredelays.csv
!pip install pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
# SparkSession 생성

spark = (SparkSession
         .builder
         .appName("Spakr4")
         .enableHiveSupport()
         .getOrCreate())

csv = "/content/departuredelays.csv"


# 읽고 임시뷰 생성
# 스키마 추론

df = (spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(csv))
# 위 코드와 아래 코드는 같음
#df = spark.read.csv(csv, header=True, inferSchema=True)
df.createOrReplaceTempView("us_delay_flights_tbl")

이제 임시뷰를 사용할 수 있고, 스파크 SQL을 사용해서 SQL 쿼리를 실행할 수 있다.

상용 SQL과 다르지 않지만 중요한 것은 ANSI:2003과 호환되는 SQL 인터페이스를 제공하고, SQL과 데이터 프레임 간에 상호 운용성을 보이는 점이다.

다음 예제는 비행거리가 1000마일 이상인 모든 항공편을 찾는 쿼리다

spark.sql("""SELECT distance, origin, destination
FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC""").show(10)
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

하지만 3장의 내용을 잘 알면 이런 지저분한 것 보다 API를 활용해 쉽게 동일한 결과를 출력할 수 있다.

(df.select("distance", "origin", "destination")
.where(col("distance") > 1000)
.orderBy(col("distance").desc())
.show(10))
+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

결론적으로, 지금 저자가 하고싶은 말은 MySQL과 같은 상용 SQL 처럼 쿼리를 날릴 수 있다.

여튼 결과를 보면 가장 긴 비행은 호놀룰루(HNL)과 뉴욕(JFK)이었다. 다음으로 샌프란시스코(SFO)와 시카고(ORD) 간 2시간 이상 지연이 있는 항공편을 찾아보자

spark.sql("""SELECT date, delay, origin, destination
FROM us_delay_flights_tbl
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'
ORDER by delay DESC""").show(10)
+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638|   SFO|        ORD|
|1031755|  396|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1051205|  320|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|2171115|  296|   SFO|        ORD|
|1071040|  279|   SFO|        ORD|
|1051550|  274|   SFO|        ORD|
|3120730|  266|   SFO|        ORD|
|1261104|  258|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows
(df
 .select("date", "delay","origin", "destination")
 .where((col("delay") > 120) & (col("origin") == "SFO") & (col("destination") == "ORD"))
 .orderBy(col("delay").desc())
 .show(10)
)
+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638|   SFO|        ORD|
|1031755|  396|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1051205|  320|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|2171115|  296|   SFO|        ORD|
|1071040|  279|   SFO|        ORD|
|1051550|  274|   SFO|        ORD|
|3120730|  266|   SFO|        ORD|
|1261104|  258|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows

샌프란시스코와 시카고 사이에도 상당히 많은 지연이 있는 것으로 보인다.

그렇다면 이러한 지연이 휴일과 관련되었는지 계절의 영향인지 알아보자

SQL의 CASE 절을 사용하는 좀 더 복잡한 쿼리를 시도해보자.

spark.sql("""SELECT delay, origin, destination,
            CASE
                WHEN delay > 360 THEN 'Very Long Delays'
                WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays'
                WHEN delay >= 60 AND delay < 120 THEN 'Short Delays'
                WHEN delay > 0 AND delay < 60 THEN 'Tolerable Delays'
                WHEN delay = 0 THEN 'No Delays'
                ELSE 'Early'
            END AS Flight_Delays
            FROM us_delay_flights_tbl
            ORDER BY origin, delay DESC""").show(10)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows
(df
 .select("delay","origin","destination") # 컬럼 선택
 .withColumn("Flight_Delays",when(col("delay") > 360, "Very Long Delays") # 새로운 열 이름으로 데이터를 구성 when은 조건들임.
 .when((col("delay") >= 120) & (col("delay") <= 360), "Long Delays")
 .when((col("delay") >= 60) & (col("delay") < 120), "Short Delays")
 .when((col("delay") > 0) & (col("delay") < 60), "Tolerable Delays")
 .when(col("delay") == 0, "No Delays")
 .otherwise("Early"))
 .orderBy(col("delay").desc()) # 정렬
 .show(10))
+-----+------+-----------+----------------+
|delay|origin|destination|   Flight_Delays|
+-----+------+-----------+----------------+
| 1642|   TPA|        DFW|Very Long Delays|
| 1638|   SFO|        ORD|Very Long Delays|
| 1636|   FLL|        DFW|Very Long Delays|
| 1592|   RSW|        ORD|Very Long Delays|
| 1560|   BNA|        DFW|Very Long Delays|
| 1553|   PDX|        DFW|Very Long Delays|
| 1543|   CLE|        DFW|Very Long Delays|
| 1511|   MCO|        ORD|Very Long Delays|
| 1500|   EGE|        JFK|Very Long Delays|
| 1496|   ONT|        DFW|Very Long Delays|
+-----+------+-----------+----------------+
only showing top 10 rows

지금까지 내가 SQL을 spark 데이터 프레임 API로 변경해서 작업을 했다.
하면서 느낀거지만 SQL 쿼리 작성보다 API를 활용하는 것이 보다 시각적으로, 의미적으로 해석하기 편한 것 같다.

정형 데이터(CSV)를 쿼리할 수 있도록 스파크는 메모리와 디스크상에서 뷰와 테이블의 생성 및 관리를 해야 하는 복잡한 작업들을 관리한다. 그렇다면 이제 다음 주제인 테이블과 뷰를 생성하고 관리하는 방법으로 넘어가자.

SQL 테이블과 뷰


테이블은 데이터를 갖는다. 스파크는 각 테이블과 해당 데이터에 관련된 정보인 스키마, 설명, 테이블명, DB명, 컬럼명, 파티션, 실제 데이터의 물리적 위치 등 메타데이터를 갖는다.

스파크는 스파크 테이블만을 위한 별도 메타스토어를 생성하지 않고 기본적으로 /user/hive/warehouse에 있는 하이브 메타스토어를 사용해 테이블에 대한 모든 메타데이터를 유지한다.

하지만 spark.sql.warehouse.dir을 로컬 또는 외부 분산 저장소로 설정해서 다른 위치로 기본 경로 설정이 가능함.

메타스토어 자체는 Hive와 관련이 깊어보이니 우선 그런게 있구나 정도로 넘어가자

관리형 테이블과 비관리형 테이블


스파크는 관리형(managed)비관리형(unmanaged) 두 가지 유형의 테이블을 만들 수 있다.

관리형의 경우 메타데이터와 파일 저장소의 데이터를 모드 관리한다.
파일 저장소는 로컬 파일 시스템 혹은 하둡 파일 시스템(HDFS)거나 아마존, 에저(Azure)같은 객체 저장소일 수 있다.
관리형 테이블을 사용하면 스파크는 모든 것을 관리하기 때문에 DROP TABLE <테이블명>과 같은 SQL 명령은 메타데이터와 실제 데이터를 모두 삭제한다.

비관리형 테이블의 경우 스파크는 오직 메타데이터만 관리하고 카산드라와 같은 외부 데이터 소스에서 데이터를 직접 관리한다.
또한 DROP TABLE <테이블명> 명령이 실제 데이터는 그대로 두고 메타데이터만 삭제한다.

spark.sql("DROP TABLE managed_us_delay_flights_tbl")
spark.sql("DROP DATABASE learn_spark_db")
DataFrame[]
# 데이터베이스를 생성하고 스파크에서 사용할 것임을 알려줌
# 근데 말이 데이터베이스지 데이터 프레임임
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
DataFrame[]
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)")
csv = "departuredelays.csv"

schema = "date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df = spark.read.csv(csv, schema = schema)

flights_df.write.saveAsTable("managed_us_delay_flights_tbl")
flights_df.show()
+--------+-----+--------+------+-----------+
|    data|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|    date| null|    null|origin|destination|
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
+--------+-----+--------+------+-----------+
only showing top 20 rows
# 비관리형 테이블 생성
spark.sql("""CREATE TABLE us_delay_flights_tbl (
    date STRING,
    delay INT,
    distance INT,
    origin STRING,
    destination STRING
    )
    USING csv OPTIONS (PATH 'departuredelays.csv')
    """)
DataFrame[]
(flights_df
 .write
 .option("path", "us_flights_delay")
 .saveAsTable("us_delay_flights_tbl"))

뷰 생성하기


테이블을 생성하는 것 외에도 스파크는 기존 테이블을 토대로 뷰를 만들 수 있다.
뷰는 전역 또는 세션 범위일 수 있으며 일시적으로 스파크 애플리케이션이 종료되면 사라진다.

전역, 세션 의미
전역 : 해당 클러스터의 모든 SparkSession에서 볼 수 있음
세션 : 단일 SparkSession에서만 볼 수 있음

뷰 생성은 데이터베이스 내에서 테이블을 생성할 때와 유사한 구문을 사용한다. 뷰를 생성한 후에는 테이블처럼 쿼리할 수 있다. 뷰는 테이블과 달리 실제로 데이터를 소유하지 않기 때문에 스파크 애플리케이션이 종료되면 테이블은 유지되지만 뷰는 사라진다.

SQL을 사용해서 기존 테이블에서 뷰를 생성할 수 있다.

CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_SFO_global_tmp_view_ AS
SELECT date, origin, destination FROM us_delay_flights_tbl WHERE
origin = "SFO';
CREATE OR REPLACE GLOBAL TEMP VIEW us_origin_airport_JFK_tmp_view_ AS
SELECT date, origin, destination FROM us_delay_flights_tbl WHERE
origin = "JFK'

데이터 프레임 API를 통해서도 가능하다.

df_sfo = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'SFO'")

df_jfk = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'JFK'")

df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")

이러한 뷰를 생성한 후 테이블에 대해 수행하는 것처럼 쿼리를 실행할 수 있다. 스파크는 global_temp라는 전역 임시 데이터베이스에서 전역 임시 뷰를 생성하므로 해당 뷰에 액세스할 때는 global_temp.<view_name> 접두사를 사용해야함.

임시 뷰 vs 전역 임시 뷰


임시 뷰, 전역 임시 뷰는 크게 차이가 없기 때문에 스파크를 처음 접하는 개발자들 사이에서 약간 혼란이 온다.

임시 뷰는 스파크 애플리케이션 내의 단일 SparkSession에 연결된다. 반면에 전역 임시 뷰는 스파크 애플리케이션 내의 여러 SparkSession에서 볼 수 있다.

사용자는 단일 스파크 애플리케이션 내에서 여러 SparkSession을 만들 수 있다.
특히, 동일한 하이브 메타스토어 구성을 공유하지 않는 두 개의 서로 다른 SparkSession에서 같은 데이터에 액세스하고 결합할 때 유용하다.

메타데이터 보기


앞서 언급했듯 스파크는 각 관리형 및 비관리형 테이블에 대한 메타데이터를 관리한다.

스파크 SQL의 상위 모듈인 카탈로그에 저장된다. 이 카탈라고는 스파크 2.x 버전에서 새롭게 확장된 기능임.

Spark 3.0은 외부 카탈로그 사용하도록 개선됨

스파크 애플리케이션 내에서 SparkSession 변수인 spark를 만들고 다음과 같은 함수를 통해 저장된 모든 메타데이터에 액세스 가능하다.

spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")
[Column(name='date', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='delay', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='distance', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='origin', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='destination', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

SQL 테이블 캐싱하기


다음 장에서 테이블 캐싱 전략에 대해 설명할 것이지만 여기서 간단하게 언급하고자 함.

Spark 3.0에서는 테이블을 LAZY로 지정할 수 있다. 즉, 테이블을 바로 캐싱하지 않고 처음 사용되는 시점에서 캐싱한다.

테이블을 데이터 프레임으로 읽기


데이터 엔지니어는 주로 일반적인 데이터 수집과 ETL 프로세스의 일부로 데이터 파이프라인을 구축한다.

애플리케이션의 다운스트림에서 사용할 수 있도록 스파크 SQL 데이터베이스 및 테이블을 정리된 데이터로 로드한다.

사용할 준비가 된 기존 데이터베이스 learn_spark_db와 테이블 us_delay_flights_tbl이 있다고 가정하자.

외부 JSON에서 읽는 대신 SQL을 이용해 테이블을 쿼리하고 반환 결과를 데이터 프레임에 저장할 수 있다.

us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table("us_delay_flights_tbl")

하고 싶은 이야기는 SQL 테이블에서 읽은 가공된 데이터 프레임을 가질 수 있다는 것.

일반적으로 pandas를 사용하면 csv를 읽어서 데이터 프레임으로 변환하지만
스파크는 SQL 테이블에서 데이터 프레임으로 변환할 수 있다는 것을 보임.

us_flights_df.show()
us_flights_df2.show()
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE|        DTW|
+-------+-----+--------+------+-----------+
only showing top 20 rows

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE|        DTW|
+-------+-----+--------+------+-----------+
only showing top 20 rows

데이터 프레임 및 SQL 테이블을 위한 데이터 소스


스파크 SQL은 다양한 데이터 소스에 대한 인터페이스를 제공한다. 또한 데이터 소스 API를 사용해 이러한 데이터 소스로부터 데이터를 읽고 쓸 수 있도록 일반적인 함수를 제공한다.

데이터 소스와 관련된 특정 옵션과 함께 기본 제공 데이터 소스, 사용 가능한 파일 형식, 데이터 로드 및 쓰기 방법에 대해 설명한다.
하지만 먼저 서로 다른 데이터 소스간에 의사소통하는 방법을 제공하는 상위 수준 데이터 소스 API인 DataFrameReaderDataFrameWriter를 자세히 보자.

DataFrameReader


데이터 소스에서 데이터 프레임으로 데이터를 읽기 위한 핵심 구조다.

DataFrameReader.format(args).option("key", "value").schema(args).load()

함수를 함께 연결하는 이 패턴은 스파크에서 일반적으로 사용되며 가독성이 높다.

SQL 쿼리문 보다 훨씬 잘 보임

오직 SparkSession 인스턴스를 통해서만 이 DataFrameReader에 액세스할 수 있다.

인스턴스 핸들을 얻기 위해 다음을 사용해야함.

SparkSession.read
// or
SparkSession.readStream

read는 정적 데이터 소스에서 DataFrame으로 읽기 위해 DataFrameReader에 대한 핸들을 반환하는 반면, readStream은 스트리밍 소스에서 읽을 인스턴스를 반환한다.

DataFrameReader의 공용 함수에 대한 인수는 각각 다른 값을 사용한다.

2

이 책에서는 인수와 옵션의 다른 모든 조합을 다 열거하지는 않겠지만, 파이썬, 스칼라, R 및 자바에 대한 문서는 제안과 지침을 제공한다.

DataFrameWriter


지정된 내장 데이터 소스에 데이터를 저장 혹은 쓰는 작업을 수행한다. SparkSession이 아닌 저장하려는 데이터 프레임에서 인스턴스에 액세스가 가능하다.

DataFrameWriter.format(args).option(args).bucketBy(args).partitionBy(args).save(path)

DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(Table)

인스턴스 핸들을 가져오려면 다음과 같음

근데 말했지만 쓰기는 SparkSession이 아닌 인스턴스에 액세스가 가능함.

DataFrame.write
//or
DataFrame.writeStream

DataFrameWriter관련 함수, 인수 및 옵션이다.

4

뭐.. 다음 내용은 여러 파일 형식에 대한 설명인데 이건 알아서 책을 통해 학습하는게..

신기해서 이미지 파일 읽는 것 보여줌

from pyspark.ml import image

image_dir = "/content/ch01"
image_df = spark.read.format("image").load(image_dir)
image_df.printSchema()
root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
image_df.select("image.height", "image.width", "image.nChannels", "image.mode").show(truncate = False)
+------+-----+---------+----+
|height|width|nChannels|mode|
+------+-----+---------+----+
|555   |852  |4        |24  |
|283   |805  |4        |24  |
|784   |1103 |4        |24  |
|272   |787  |4        |24  |
|162   |1102 |4        |24  |
+------+-----+---------+----+

이미지 채널이 4개인 이유는 아마 CMYK형식이라 그럴 것 같다.

그리고 심지어 이진 파일도 읽을 수 있다.

binary_df = (spark.read.format("binaryFile").option("pathGlobFilter", "*.png").load(image_dir))
binary_df.show()
+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/content/ch0...|2023-07-27 06:33:...|331581|[89 50 4E 47 0D 0...|
|file:/content/ch0...|2023-07-27 06:33:...|316946|[89 50 4E 47 0D 0...|
|file:/content/ch0...|2023-07-27 06:33:...|262759|[89 50 4E 47 0D 0...|
|file:/content/ch0...|2023-07-27 06:33:...|147972|[89 50 4E 47 0D 0...|
|file:/content/ch0...|2023-07-27 06:33:...| 94181|[89 50 4E 47 0D 0...|
+--------------------+--------------------+------+--------------------+

여튼 그래서 데이터 프레임 API와 스파크 SQL 간 상호 운용성에 대해 설명했고 스파크 SQL을 사용해서 얻는 이점도 알아봄.

ㅃ2

Categories:

spark