본문 바로가기메뉴 바로가기


안녕하세요. 카카오 데이터PE셀(응용분석팀)의 Logan입니다. 

응용분석팀에서 식별키 성연령 개발을 담당하고 있습니다. 데이터 분석에 Spark를 메인으로 사용하고 있고, 모델링에는 Tensorflow를 주로 사용하고 있습니다.

여기에서는 Spark Partition에 대한 개념을 소개하고, 최적화 실험을 통해 Spark Partition을 이해하는 방법에 대해 설명하고자 합니다.

Spark를 다루면서 자원 세팅을 어떻게 해야 하는지가 항상 고민이었습니다. 여러 시행착오를 겪으면서 Spark에서 자원 세팅을 하려면 먼저 Partition에 대한 개념을 알아야 한다는 것을 알게 되었습니다. 이번 글에서는 Spark의 Partition 중에서도 Shuffle Partition 최적화에 대해 다루려고 합니다. 


Spark Partition이란?

Spark Partition의 개념

Partition은 RDDs나 Dataset를 구성하고 있는 최소 단위 객체입니다. 각 Partition은 서로 다른 노드에서 분산 처리됩니다.

Spark에서는 하나의 최소 연산을 Task라고 표현하는데, 이 하나의 Task에서 하나의 Partition이 처리됩니다. 또한, 하나의 Task는 하나의 Core가 연산 처리합니다.

즉, 1 Core = 1 Task = 1 Partition입니다. 

예를 들어, 다음과 같다면 전체 Core 수를 300개로 세팅한 상태이고, 이 300개가 현재 실행 중인 Task 수이자, 현재 처리 중인 Partition 수에 해당합니다. 또한, 전체 Patition 수는 1800개로 세팅했으며, 이는 전체 Task 수이기도 합니다.

이처럼 설정된 Partition 수에 따라 각 Partition의 크기가 결정됩니다. 그리고 이 Partition의 크기가 결국 Core 당 필요한 메모리 크기를 결정하게 됩니다.

  • Partition 수 → Core 수
  • Partition 크기 → 메모리 크기

따라서, Partition의 크기와 수가 Spark 성능에 큰 영향을 미치는데, 통상적으로는 Partition의 크기가 클수록 메모리가 더 필요하고, Partition의 수가 많을수록 Core가 더 필요합니다.

  • 적은 수의 Partition = 크기가 큰 Partition
  • 많은 수의 Partition = 크기가 작은 Partition

즉, Partition의 수를 늘리는 것은 Task 당 필요한 메모리를 줄이고 병렬화의 정도를 늘립니다.

Spark Partition의 종류

동일한 Partition이지만, 쓰이는 때에 따라 다음의 3가지로 구분할 수 있습니다.

  • Input Partition
  • Output Partition
  • Shuffle Partition

이 중, Spark의 주요 연산이 Shuffle인 만큼, Shuffle Partition이 가장 중요합니다.

Input Partition

관련 설정 : spark.sql.files.maxpartitionBytes

Input Partition은 처음 파일을 읽을 때 생성하는 Partition입니다. 관련 설정값은 spark.sql.files.maxPartitionBytes으로, Input Partition의 크기를 설정할 수 있고, 기본값은 134217728(128MB)입니다.

파일 (HDFS 상의 마지막 경로에 존재하는 파일)의 크기가 128MB보다 크다면, Spark에서 128MB만큼 쪼개면서 파일을 읽습니다. 파일의 크기가 128MB보다 작다면 그대로 읽어 들여, 파일 하나당 Partition 하나가 됩니다. 

대부분의 경우, 필요한 칼럼만 골라서 뽑아 쓰기 때문에 파일이 128MB보다 작습니다. 가끔씩 큰 파일을 다룰 경우에는 이 설정값을 조절해야 합니다. 

Use Case

  • 파일 하나의 크기가 매우 크고 수도 많다면, 설정값 크기를 늘리고 자원도 늘려야 하지만, 제 경험상 이런 경우는 없었습니다.
  • 또한, 필요한 칼럼(column)만 쓰기 때문에 데이터의 크기는 더 작아집니다.

Output Partition

관련 설정 : df.repartition(cnt), df.coalesce(cnt)

Output Partition은 파일을 저장할 때 생성하는 Partition입니다. 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정합니다. 

기본적으로, HDFS는 큰 파일을 다루도록 설계되어 있어, 크기가 큰 파일로 저장하는 것이 좋습니다.

보통 HDFS Blocksize에 맞게 설정하면 되는데, 카카오 Hadoop 클러스터의 HDFS Blocksize는 268435456 (256MB)로 설정되어 있어서, 통상적으로 파일 하나의 크기를 256MB에 맞도록 Partition의 수를 설정하면 됩니다.

Partition의 수는 df.repartition(cnt), df.coalesce(cnt)를 통해 설정합니다. 이 repartition와 coalesce를 이용해 Partition 수를 줄일 수 있습니다. 

아래의 예시는, 파일 수를 줄여서 50개로 저장하는 모습입니다. 

Use Case

  • 보통 groupBy 집계 후 저장할 때 데이터의 크기가 작아집니다. 그런 다음 spark.sql.shuffle.partitions 설정에 따라 파일 수가 지정되는데, 이때 파일의 크기를 늘리기 위해 repartition와 coalesce을 사용해 Partition 수를 줄일 수 있습니다.
  • df.where()를 통해 필터링을 하고 나서 그대로 저장한다면 파편화가 생깁니다. 그래서 repartition(cnt)을 한 후 저장합니다.

Shuffle Partition

관련 설정 : spark.sql.shuffle.partitions

Spark 성능에 가장 크게 영향을 미치는 Partition으로, Join, groupBy 등의 연산을 수행할 때 Shuffle Partition이 쓰입니다.

설정값은 spark.sql.shuffle.partitions이고, 이 설정값에 따라 Join, groupBy 수행 시 Partition의 수(또는 Task의 수)가 결정됩니다.

이 설정값은 Core 수에 맞게 설정하라고 하지만, Partition의 크기에 맞추어서 설정해야 합니다.

이 Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어나기 때문입니다. 

Shuffle Spill이 일어나면, Task가 지연되고 에러가 발생할 수 있습니다. 또한, Hadoop 클러스터의 사용률이 높다면, 연달아 에러가 발생하고 Spark가 강제 종료될 수 있습니다.

Memory Limit Over와 같이, Shuffle Spill도 메모리 부족으로 나타나는데, 보통 이에 대한 대응을 Core 당 메모리를 늘리는 것으로 해결합니다. 하지만, 모든 사람이 메모리가 부족하다고 메모리 할당량을 늘린다면, 클러스터가 사용성이 더 떨어지고 작업이 더욱더 실패하게 될 것입니다. 그래서 제 개인적인 생각이기도 하지만, Partition의 크기를 결정하는 옵션인 spark.sql.shuffle.partitions를 우선적으로 고려해 설정해야 한다고 생각합니다.

또한, 일반적으로, 하나의 Shuffle Partition 크기가 100~200MB 정도 나올 수 있도록 spark.sql.shuffle.partitions 수를 조절하는 것이 최적입니다.

Use Case

  • Memory Limit Over, Memory Spill 등 자원 문제가 생길 경우, Shuffle Partition 크기를 우선적으로 고려해야 합니다.

최적화 실험

Shuffle Partition 크기가 100~200MB 정도 나올 수 있도록 설정하는 것이 얼마나 중요한지 다음의 최적화 실험을 통해 살펴보겠습니다.

실험 구성

실험 환경

  • 카카오 Hadoop 클러스터
  • spark 3.1.2
  • 3 Cores X 6 GB 메모리 X 100 instances (Core 당 2GB 메모리)

다음의 데이터 집계를 예시로, Shuffle Partition에 대해 최적화를 해보겠습니다.

– data1 schema : (key, info)
– data2 schema : (key, action_name)

데이터 집계에 대한 쿼리 결과는 다음과 같습니다. 

쿼리 결과

action_nameinfocnt
a129928
b368936
c163319
d240775
e53657

예시 코드

// Shuffle Partition 수 설정spark.conf.set("spark.sql.shuffle.partitions", 300) 

val data1 = spark.sql("select * from data1")
val data2 = spark.sql("select * from data2") 

val jExpr = data1.col("key") === data2.col("key")
val joinDF = data1.join(data2, jExpr) 

joinDF.groupBy("action_name", "info")
.agg(count(lit(1)).as("cnt"))
.show

실험 1: 코어 수에 맞게 파티션 수 설정(대조군)

첫 번째 실험에서는 전체 Core 수에 맞게 Partitions 수를 설정하고 위의 코드를 실행합니다.
데모 데이터와 카카오 로그를 조인해서 로그 수를 카운트하는 쿼리입니다.

옵션
spark.sql.Shuffle.Partitions = 300

결과 

위의 코드의 실행 정보는 다음과 같습니다.

총 수행 시간은 8.4h(각 Tasks의 수행 시간의 합)이고 Task 수행 수는 306으로, 6개의 Task에서 에러가 발생해 추가적으로 연산을 수행했습니다(Locality Level Summary: Process local: 306).

Shuffle Read Size가 250GB로 Partition 당 크기가 약 850MB이고, Partition 당 Spill (memory)은 약 2.5 GB ~ 4.5 GB로, 1 Core 당 2 GB 메모리의 자원으로는 작업 수행이 힘듭니다. Hadoop 클러스터의 사용량이 높다면 에러가 반복적으로 발생하고 Spark 앱이 종료될 수 있습니다.

실험 2: 파티션 수 6배 증대

Partition 당 크기가 140MB 정도로 설정이 되도록, 대략 840MB / 6 = 140MB로, 실험 1의 Partition 수의 6배인 1800으로 Partition 수를 설정했습니다.

(또는 전체 Shuffle Size / 140MB를 하면 (250.6GB + 23.6MB) / 140MB = 약 1800)

옵션
spark.sql.Shuffle.Partitions = 1800

결과

총 수행 시간은 5.5h로 Partition을 300으로 설정했을 때보다 2.9h가 줄어들었고, 총 1801개의 Task가 수행되었습니다(Task 하나에서 에러 발생).

전체 Spill은 약 221GB인데, 위의 표에서 보면 특정 Tasks(상위 25%, Max)에서 Spill이 일어났다는 것을 알 수 있습니다. 데이터가 특정 키를 기준으로 몰려있다면(skew), 이런 현상이 나타날 수 있습니다.

어떤 쿼리에서 spill이 생겼는지를 상세 확인하려면, Spark UI > SQL 탭에 들어가 보면 됩니다.

왼쪽 위 사각형에서 Join 하기 직전 Sort 부분에서 Spill이 일어난 것을 확인할 수 있습니다.

카카오 로그는 큰 편이고 특정 키를 기준으로 많은 로그 수가 존재하기 때문에, 쿼리 최적화를 해주어야 합니다.

실험 3: 쿼리 최적화

쿼리 최적화는 로그 데이터를 한번 집계를 한 뒤, 유저 데모 데이터와 Join을 하는 것으로 수정했습니다.

// Partition 수 설정spark.conf.set("spark.sql.shuffle.partitions", 1800) 

val data1 = spark.sql("select * from data1")
val data2 = spark.sql("select * from data2")
.groupBy("key", "action_name") // 조인 전 로그 수 집계
.agg(count(lit(1)).as("cnt")) 

val jExpr = data1.col("key") === data2.col("key")
val joinDF = data1.join(data2, jExpr)  

joinDF.groupBy("action_name", "info")
.agg(sum($"cnt").as("cnt"))
.show

결과

카카오 로그 집계 부분

유저 데모와 join 후 집계 부분

총 수행 시간은 3.3h(3.2h + 22min)으로, 쿼리 최적화를 하기 전보다 2.2h 줄어들었습니다. 

카카오 로그 집계 부분과 데모와 Join 후 집계 부분 둘 다 Spill이 일어나지 않고 정상적으로 동작했습니다.

여기에서 Shuffle Read Size와 Shuffle Write Size의 총합이 300GB가 되지 않는데, 약 600GB 이상의 총 메모리를 사용하는 것이 낭비일 수 있습니다.

그래서 다음의 실험 4에서 3 Cores X 3 GB 메모리 X 100 instances로 실험을 해보았습니다.

실험 4: 최적화 후 코어당 메모리 감소

위의 쿼리 최적화를 한다면 1 Core 당 1GB 메모리에서도 정상적으로 작동합니다.

최적화 실험 결론: 최적화 시 고려할 점

위에서 Partition에 대해서 설명했지만, 가장 중요한 최적화 부분은 코드(쿼리)입니다. 최적화의 우선순위는 쿼리 > Partition 수 > Core 당 메모리 증가입니다.

쿼리는 최대한 groupBy로 집계를 한 후 Join을 하고 그다음에 Partition 수를 조절한 다음, 그래도 안된다면 Core 당 메모리를 증가시켜야 합니다.

Partition 수를 증가시킨다면 Task 수도 늘어나서 실행 시간이 증가될 수 있지만, Shuffle Spill이 일어나지 않도록 한다면 시간이 더 감소됩니다. 따라서, Shuffle Spill이 일어나지 않게 하는 선인 Shuffle Partition의 크기를 100 ~ 200MB로 설정하는 것이 최적입니다.

단, 대부분의 데이터 처리에서 위의 설정이 적합하지만, Shuffle Size가 600GB에 가깝거나 그 이상일 경우에는 Core 당 메모리를 증가시키는 것을 권장합니다. 보통 Shuffle Size가 600GB 이상이 되면 1 코어당 4GB를 고려하는 것을 권장합니다. 

Cartesian join(cross join) 사용으로 Row 수가 급격하게 증가한 경우에도 Shuffle Size가 커지기 때문에 메모리 증가를 고려해야 합니다. 

이 외에도, Spark ML을 사용하거나 Caching을 하는 경우, Spark 메모리 구조 중 Storage Memory Fraction 부분에서 캐싱을 하게 되는데, 이렇게 되면 연산(Execution)을 해야 하는 부분이 줄어들어 결국에는 메모리를 증대해야 합니다.

참고

  • Storage 메모리: Spark의 Cache 데이터 저장을 위해 사용
  • Execution 메모리: Shuffle, Join, Sort, Aggregation 등의 연산 과정에서 임시 데이터 저장을 위해 사용

출처: https://www.tutorialdocs.com/article/spark-memory-management.html

각 실험별 정리

구분실험1실험2실험3실험4
메모리 / Core 수6GB / 3cores6GB / 3cores6GB / 3cores3GB / 3cores
쿼리 최적화XX
Partition 수300 1800 1800 1800 
수행시간 8.4h 5.5h 3.5h 3.7h 
Shuffle Size250GB250GB250GB 250GB 
Shuffle Size / Partition840MB 140MB 140MB 140MB 
Shuffle Spill 770GB220GB 
에러 수61

결론

  • Shuffle Spill이 일어난다면 에러가 발생해 작업이 지연될 수 있습니다. 그리고 Hadoop 클러스터가 busy 상태인 경우, 연달아 에러가 발생하고 강제 종료될 수 있습니다.
  • 메모리가 부족하다면, 우선적으로 Shuffle Partition 수를 고려해야 합니다.
  • Shuffle Partition의 크기를 100MB~200MB 사이로 나오도록 spark.sql.Shuffle.Partitions를 설정해야 합니다.

logan.wvl
logan.wvl 응용분석팀에서 식별키 성연령 개발을 맡고 있습니다.
Top Tag
2021
2021-new-krew
Ad Platform
Ad tech
adaptive-hash-index
adt
adtech
agile
agilecoach
ai
algorithm
Algorithm/ML
Algorithm/Ranking
almighty-data-transmitter
Analyzer
android
angular
anycast
App2App
applicative
Architecture
arena
ast
async
aurora
babel
babel7
Backend
BApp
bgp
big-data
binary
ble
blind-recruitment
block
Block Chain
blockchain
bluetooth
brian
business
Cache
cahtbot
canvasapi
Caver
cch
cd
CDR
ceph
certificate
certification
CF
cgroup
chrome
ci
cite
client
clojure
close-wait
cloud
cloudera-manager
clustered-block
cmux
cnn
code-festival
code-review
codereview
coding
coding test
Collaborative Filtering
competition
Compliance
component
conference
consul
container
contents
contest
contribution
cookie
core-js@3
Corporate Digital Responsibility
couchbase
COVID-19
cpp
Data
data-engineering
DB
deep-learning
Dependency
dependency-graph
desktop
dev
dev-session
dev-track
developer
developer relations
developers
devops
digitalization
digitaltransformation
dns
docker
dr
Electron
employeecard
emscripten
eslint
extract-text
Feature List
Featured
Feedback
friendstime
front-end
frontend
functional-programming
funfunday
fzf
garbage-collection
gawibawibo
GC
github
globalpollution
go
graphdb
graphql
Ground X
growth
ha
hadoop
hate speech
hbase
hbase-manager
hbase-region-inspector
hbase-snashot
hbase-table-stat
hbase-tools
hri
ICPC Sinchon
id
if kakao
ifkakao
infrastructure
innodb
internship
ios
item
Java
javascript
javascript web-assembly
JCMM
JIRA
jsconf
jsconfkorea
json
k8s
kafka
kakao
kakao-Career-Boost-Program
kakao-commerce
kakao-games
kakaoarena
kakaobrain
kakaocommerce
kakaocon
kakaoenterprise
kakaok
kakaokey
kakaokrew
kakaomap
kakaopage
kakaotalk
KAS
KCDC
khaiii
Klaytn
Klip
kubernetes
l3dsr
l4
License
links
Linux
load-balancing
MAB
Machine Learning
machine-learning
map
marathon
meetup
melon
mesos
message
Messaging
microservice
Microsoft TypeScript
mm
mobil
mocking
monad
monorepo
ms-office
MSA
mtre
mysql
mysql-realtime-traffic-emulator
nand-flash
network
new
new-krew
nfc
Nickface
nomad
ocp
olive
onboarding
open
open source
opensource
openstack
OpenWork
OSS
page
parallel
PBA
performance
planning poker
Platform
polyfill
programming-contest
project-structure
pycon
python
quagga
react
reactive-programming
reactor
recap
recommendation
recommendation system
recruitment
redis
redis-keys
redis-scan
related-blind
Renderer
rest
Rome
rubics
ruby
rxjs
s2graph
scala
scalaz
seminar
Serve
server
service
service worker
sharding
shopping
Shuffle Partition
socket
spark
spark-streaming
SpringBoot
ssd
Statistics/Analysis
Stomp
storage
storm
style-guide
summer internship
support
System
talk
talkchannel
tcp
tech
Techtalk
test
thread
Thread-Debugging
time-wait
tmux
Topic Modeling
typescript
Untact
update
User Story
vim
vim-github-dashboard
vim-plugin
vue
vue.js
WASM
web-cache
web-worker
webapp
webgl
WebSocket
webworkers
weekly
work
workplatform
개인화 추천
길찾기
라이선스
연관 추천
오픈소스
오픈소스검증
의존성분석
일하는방식
협업
All Tag
2021
2021-new-krew
Ad Platform
Ad tech
adaptive-hash-index
adt
adtech
agile
agilecoach
ai
algorithm
Algorithm/ML
Algorithm/Ranking
almighty-data-transmitter
Analyzer
android
angular
anycast
App2App
applicative
Architecture
arena
ast
async
aurora
babel
babel7
Backend
BApp
bgp
big-data
binary
ble
blind-recruitment
block
Block Chain
blockchain
bluetooth
brian
business
Cache
cahtbot
canvasapi
Caver
cch
cd
CDR
ceph
certificate
certification
CF
cgroup
chrome
ci
cite
client
clojure
close-wait
cloud
cloudera-manager
clustered-block
cmux
cnn
code-festival
code-review
codereview
coding
coding test
Collaborative Filtering
competition
Compliance
component
conference
consul
container
contents
contest
contribution
cookie
core-js@3
Corporate Digital Responsibility
couchbase
COVID-19
cpp
Data
data-engineering
DB
deep-learning
Dependency
dependency-graph
desktop
dev
dev-session
dev-track
developer
developer relations
developers
devops
digitalization
digitaltransformation
dns
docker
dr
Electron
employeecard
emscripten
eslint
extract-text
Feature List
Featured
Feedback
friendstime
front-end
frontend
functional-programming
funfunday
fzf
garbage-collection
gawibawibo
GC
github
globalpollution
go
graphdb
graphql
Ground X
growth
ha
hadoop
hate speech
hbase
hbase-manager
hbase-region-inspector
hbase-snashot
hbase-table-stat
hbase-tools
hri
ICPC Sinchon
id
if kakao
ifkakao
infrastructure
innodb
internship
ios
item
Java
javascript
javascript web-assembly
JCMM
JIRA
jsconf
jsconfkorea
json
k8s
kafka
kakao
kakao-Career-Boost-Program
kakao-commerce
kakao-games
kakaoarena
kakaobrain
kakaocommerce
kakaocon
kakaoenterprise
kakaok
kakaokey
kakaokrew
kakaomap
kakaopage
kakaotalk
KAS
KCDC
khaiii
Klaytn
Klip
kubernetes
l3dsr
l4
License
links
Linux
load-balancing
MAB
Machine Learning
machine-learning
map
marathon
meetup
melon
mesos
message
Messaging
microservice
Microsoft TypeScript
mm
mobil
mocking
monad
monorepo
ms-office
MSA
mtre
mysql
mysql-realtime-traffic-emulator
nand-flash
network
new
new-krew
nfc
Nickface
nomad
ocp
olive
onboarding
open
open source
opensource
openstack
OpenWork
OSS
page
parallel
PBA
performance
planning poker
Platform
polyfill
programming-contest
project-structure
pycon
python
quagga
react
reactive-programming
reactor
recap
recommendation
recommendation system
recruitment
redis
redis-keys
redis-scan
related-blind
Renderer
rest
Rome
rubics
ruby
rxjs
s2graph
scala
scalaz
seminar
Serve
server
service
service worker
sharding
shopping
Shuffle Partition
socket
spark
spark-streaming
SpringBoot
ssd
Statistics/Analysis
Stomp
storage
storm
style-guide
summer internship
support
System
talk
talkchannel
tcp
tech
Techtalk
test
thread
Thread-Debugging
time-wait
tmux
Topic Modeling
typescript
Untact
update
User Story
vim
vim-github-dashboard
vim-plugin
vue
vue.js
WASM
web-cache
web-worker
webapp
webgl
WebSocket
webworkers
weekly
work
workplatform
개인화 추천
길찾기
라이선스
연관 추천
오픈소스
오픈소스검증
의존성분석
일하는방식
협업

위로