5장에서는 DataFrame의 데이터를 다루는 기능을 소개함. 집계 윈도우 조인등의 내용은 7,8장
간단한 코드 예제는 github 링크로 대체
Spark/Part2/Chapter5.ipynb at main · PingPingE/Spark
Contribute to PingPingE/Spark development by creating an account on GitHub.
github.com
이론은 기존 4장과 유사해서 4장의 링크를 첨부
pyspark 스파크 프로그래밍 기초 학습 - 박홍 데이터베이스
Summary 챕터4에서는 pyspark의 기본적인 RDD관련 내용이 나옴page수가 많아서 걱정했는데 그냥 … 코드블럭과 기본적인 연산이어떻게 일어나는지에 대한 하나하나의 설명이였다.기초적인 내용이라
devhyung.github.io
Term
- 레코드 : row
- 컬럼 : 연산 표현식을 나타내는 = 스프레드시트 컬럼
- 스키마 : 각 컬럼명과 데이터 타입 정의
- 파티셔닝 : DF or Dataset이 클러스터에서 물리적으로 배치되는 형태
스키마(레코드+컬럼)
DF의 컬럼명과 데이터 타입을 정의함.
from pyspark.sql import types as T
import pyspark
spark.read.format('json').load(path).schema
#스키마 출력
df.printSchema()
#직접 스키마 정의
myManualSchema = T.StructType([
T.StructField('DEST_COUNTRY_NAME', T.StringType(), True),
T.StructField('ORIGIN_COUNTRY_NAME', T.StringType(), True),
T.StructField('count', T.LongType(), False, metadata={'hello':'world'})
])
#col함수로 컬럼 생성
from pyspark.sql import functions as F
F.col('someColumnName')
#Row
from pyspark.sql import Row
myRow = Row("hello", None, 1, False)
스키마생성 및 예제
myManualSchema=T.StructType([
T.StructField("some", T.StringType(), True),
T.StructField("col", T.StringType(), True),
T.StructField("names", T.LongType(), True),
])
myRow = Row("hello", None,1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
#select dest_country_name from table limit 2
df.select("DEST_COUNTRY_NAME").show(2)
+-----------------+ DEST_COUNTRY_NAME| +-----------------+ United States| United States| +-----------------+ only
#select dest_country_name, origin_country_name from table limit 2
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
+-----------------+-------------------+ DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| +-----------------+-------------------+ United States| Romania| United States| Croatia| +-----------------+-------------------+ only showing top 2
#expr함수로 컬럼 참조하기
#expr함수는 단순 컬럼 참조나 문자열을 이용해 컬럼을 참조할 수 있음
df.select(F.expr("dest_country_name as destination")).show(2)
#리터럴
df.selectExpr('*', '1 as one').show(5)
+-----------------+-------------------+-----+---+ DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one| +-----------------+-------------------+-----+---+ United States| Romania| 15| 1| United States| Croatia| 1| 1| United States| Ireland| 344| 1| Egypt| United States| 15| 1| United States| India| 62| 1| +-----------------+-------------------+-----+---+
#withcol
df.withColumn('withinCountry', F.expr('origin_country_name == dest_country_name')).show(5)
+-----------------+-------------------+-----+-------------+ DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry| +-----------------+-------------------+-----+-------------+ United States| Romania| 15| false| United States| Croatia| 1| false| United States| Ireland| 344| false| Egypt| United States| 15| false| United States| India| 62| false| +-----------------+-------------------+-----+---
#필터링
df.filter(F.col('count')<2).show(2)
+-----------------+-------------------+-----+ DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ United States| Croatia| 1| United States| Singapore| 1| +-----------------+-------------------+-----+ only showing top 2 rows
#합치기
schema= df.schema
newRows= [
Row('new_country', 'other_country', 5),
Row('new_country_2', 'otheR_country_3', 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)
.where('count=1')
.where(F.col('ORIGIN_COUNTRY_NAME') != 'United States').show()
정렬
df.sort('count').show(3)
+-----------------+-------------------+-----+ DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ Moldova| United States| 1| United States| Singapore| 1| United States| Croatia| 1| +-----------------+-------------------+-----+ only showing top 3 rows
df.orderBy(F.desc('count'), 'DEST_COUNTRY_NAME').show(3)
+-----------------+-------------------+------+ DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count| +-----------------+-------------------+------+ United States| United States|370002| United States| Canada| 8483| Canada| United States| 8399| +-----------------+-------------------+------+ only showing top 3 rows
리파티셔닝
df.rdd.getNumPartitions()
Out[100]: 1
#파티션 수 지정
df.repartition(5)
Out[99]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
#특정 컬럼 기준 파티션 재분배
df.repartition(F.col('DEST_COUNTRY_NAME'))
Out[102]: DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
#특정 컬럼을 기준으로 셔플을 수행해서 5개의 파티션으로 나누고,
#전체 데이터를 셔플 없이 병합
df.repartition(5, col('DEST_COUNTRY_NAME')).coalesce(2)
'스터디 > 책 모임' 카테고리의 다른 글
[PySpark] 스파크 완벽 가이드 - 4장 구조적 API 개요 (1) | 2024.05.02 |
---|---|
[PySpark] 스파크 완벽 가이드 - 17장 스파크 배포 환경 (0) | 2024.04.24 |
[PySpark] 스파크 완벽 가이드 - 16장 스파크 어플리케이션 개발하기 (0) | 2024.04.24 |
[PySpark] 스파크 완벽 가이드 - 15장 클러스터에서 스파크 실행하기 (0) | 2024.04.24 |