Spark Shuffle Partition과 최적화

안녕하세요. 카카오 데이터PE셀(응용분석팀)의 Logan입니다. 

응용분석팀에서 식별키 성연령 개발을 담당하고 있습니다. 데이터 분석에 Spark를 메인으로 사용하고 있고, 모델링에는 Tensorflow를 주로 사용하고 있습니다.

여기에서는 Spark Partition에 대한 개념을 소개하고, 최적화 실험을 통해 Spark Partition을 이해하는 방법에 대해 설명하고자 합니다.

Spark를 다루면서 자원 세팅을 어떻게 해야 하는지가 항상 고민이었습니다. 여러 시행착오를 겪으면서 Spark에서 자원 세팅을 하려면 먼저 Partition에 대한 개념을 알아야 한다는 것을 알게 되었습니다. 이번 글에서는 Spark의 Partition 중에서도 Shuffle Partition 최적화에 대해 다루려고 합니다. 

 


 

Spark Partition이란?

 

Spark Partition의 개념

Partition은 RDDs나 Dataset를 구성하고 있는 최소 단위 객체입니다. 각 Partition은 서로 다른 노드에서 분산 처리됩니다.

Spark에서는 하나의 최소 연산을 Task라고 표현하는데, 이 하나의 Task에서 하나의 Partition이 처리됩니다. 또한, 하나의 Task는 하나의 Core가 연산 처리합니다.

즉, 1 Core = 1 Task = 1 Partition입니다. 

예를 들어, 다음과 같다면 전체 Core 수를 300개로 세팅한 상태이고, 이 300개가 현재 실행 중인 Task 수이자, 현재 처리 중인 Partition 수에 해당합니다. 또한, 전체 Patition 수는 1800개로 세팅했으며, 이는 전체 Task 수이기도 합니다.

 

이처럼 설정된 Partition 수에 따라 각 Partition의 크기가 결정됩니다. 그리고 이 Partition의 크기가 결국 Core 당 필요한 메모리 크기를 결정하게 됩니다.

  • Partition 수 → Core 수
  • Partition 크기 → 메모리 크기

 

따라서, Partition의 크기와 수가 Spark 성능에 큰 영향을 미치는데, 통상적으로는 Partition의 크기가 클수록 메모리가 더 필요하고, Partition의 수가 많을수록 Core가 더 필요합니다.

  • 적은 수의 Partition = 크기가 큰 Partition
  • 많은 수의 Partition = 크기가 작은 Partition

 

즉, Partition의 수를 늘리는 것은 Task 당 필요한 메모리를 줄이고 병렬화의 정도를 늘립니다.

 

Spark Partition의 종류

동일한 Partition이지만, 쓰이는 때에 따라 다음의 3가지로 구분할 수 있습니다.

  • Input Partition
  • Output Partition
  • Shuffle Partition

 

이 중, Spark의 주요 연산이 Shuffle인 만큼, Shuffle Partition이 가장 중요합니다.

 

Input Partition

관련 설정 : spark.sql.files.maxpartitionBytes

Input Partition은 처음 파일을 읽을 때 생성하는 Partition입니다. 관련 설정값은 spark.sql.files.maxPartitionBytes으로, Input Partition의 크기를 설정할 수 있고, 기본값은 134217728(128MB)입니다.

파일 (HDFS 상의 마지막 경로에 존재하는 파일)의 크기가 128MB보다 크다면, Spark에서 128MB만큼 쪼개면서 파일을 읽습니다. 파일의 크기가 128MB보다 작다면 그대로 읽어 들여, 파일 하나당 Partition 하나가 됩니다. 

대부분의 경우, 필요한 칼럼만 골라서 뽑아 쓰기 때문에 파일이 128MB보다 작습니다. 가끔씩 큰 파일을 다룰 경우에는 이 설정값을 조절해야 합니다. 

Use Case

  • 파일 하나의 크기가 매우 크고 수도 많다면, 설정값 크기를 늘리고 자원도 늘려야 하지만, 제 경험상 이런 경우는 없었습니다.
  • 또한, 필요한 칼럼(column)만 쓰기 때문에 데이터의 크기는 더 작아집니다.

 

Output Partition

관련 설정 : df.repartition(cnt), df.coalesce(cnt)

Output Partition은 파일을 저장할 때 생성하는 Partition입니다. 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정합니다. 

기본적으로, HDFS는 큰 파일을 다루도록 설계되어 있어, 크기가 큰 파일로 저장하는 것이 좋습니다.

보통 HDFS Blocksize에 맞게 설정하면 되는데, 카카오 Hadoop 클러스터의 HDFS Blocksize는 268435456 (256MB)로 설정되어 있어서, 통상적으로 파일 하나의 크기를 256MB에 맞도록 Partition의 수를 설정하면 됩니다.

Partition의 수는 df.repartition(cnt), df.coalesce(cnt)를 통해 설정합니다. 이 repartition와 coalesce를 이용해 Partition 수를 줄일 수 있습니다. 

아래의 예시는, 파일 수를 줄여서 50개로 저장하는 모습입니다. 

Use Case

  • 보통 groupBy 집계 후 저장할 때 데이터의 크기가 작아집니다. 그런 다음 spark.sql.shuffle.partitions 설정에 따라 파일 수가 지정되는데, 이때 파일의 크기를 늘리기 위해 repartition와 coalesce을 사용해 Partition 수를 줄일 수 있습니다.
  • df.where()를 통해 필터링을 하고 나서 그대로 저장한다면 파편화가 생깁니다. 그래서 repartition(cnt)을 한 후 저장합니다.

 

Shuffle Partition

관련 설정 : spark.sql.shuffle.partitions

Spark 성능에 가장 크게 영향을 미치는 Partition으로, Join, groupBy 등의 연산을 수행할 때 Shuffle Partition이 쓰입니다.

설정값은 spark.sql.shuffle.partitions이고, 이 설정값에 따라 Join, groupBy 수행 시 Partition의 수(또는 Task의 수)가 결정됩니다.

 

이 설정값은 Core 수에 맞게 설정하라고 하지만, Partition의 크기에 맞추어서 설정해야 합니다.

이 Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어나기 때문입니다. 

Shuffle Spill이 일어나면, Task가 지연되고 에러가 발생할 수 있습니다. 또한, Hadoop 클러스터의 사용률이 높다면, 연달아 에러가 발생하고 Spark가 강제 종료될 수 있습니다.

Memory Limit Over와 같이, Shuffle Spill도 메모리 부족으로 나타나는데, 보통 이에 대한 대응을 Core 당 메모리를 늘리는 것으로 해결합니다. 하지만, 모든 사람이 메모리가 부족하다고 메모리 할당량을 늘린다면, 클러스터가 사용성이 더 떨어지고 작업이 더욱더 실패하게 될 것입니다. 그래서 제 개인적인 생각이기도 하지만, Partition의 크기를 결정하는 옵션인 spark.sql.shuffle.partitions를 우선적으로 고려해 설정해야 한다고 생각합니다.

또한, 일반적으로, 하나의 Shuffle Partition 크기가 100~200MB 정도 나올 수 있도록 spark.sql.shuffle.partitions 수를 조절하는 것이 최적입니다.

Use Case

  • Memory Limit Over, Memory Spill 등 자원 문제가 생길 경우, Shuffle Partition 크기를 우선적으로 고려해야 합니다.

 

최적화 실험

 

Shuffle Partition 크기가 100~200MB 정도 나올 수 있도록 설정하는 것이 얼마나 중요한지 다음의 최적화 실험을 통해 살펴보겠습니다.

 

실험 구성

실험 환경

  • 카카오 Hadoop 클러스터
  • spark 3.1.2
  • 3 Cores X 6 GB 메모리 X 100 instances (Core 당 2GB 메모리)

 

다음의 데이터 집계를 예시로, Shuffle Partition에 대해 최적화를 해보겠습니다.

– data1 schema : (key, info)
– data2 schema : (key, action_name)

데이터 집계에 대한 쿼리 결과는 다음과 같습니다. 

쿼리 결과

action_name info cnt
a 1 29928
b 3 68936
c 1 63319
d 2 40775
e 5 3657

예시 코드

// Shuffle Partition 수 설정spark.conf.set("spark.sql.shuffle.partitions", 300) 

val data1 = spark.sql("select * from data1")
val data2 = spark.sql("select * from data2") 

val jExpr = data1.col("key") === data2.col("key")
val joinDF = data1.join(data2, jExpr) 

joinDF.groupBy("action_name", "info")
.agg(count(lit(1)).as("cnt"))
.show

 

 

실험 1: 코어 수에 맞게 파티션 수 설정(대조군)

첫 번째 실험에서는 전체 Core 수에 맞게 Partitions 수를 설정하고 위의 코드를 실행합니다.
데모 데이터와 카카오 로그를 조인해서 로그 수를 카운트하는 쿼리입니다.

옵션
spark.sql.Shuffle.Partitions = 300

 

결과 

위의 코드의 실행 정보는 다음과 같습니다.

총 수행 시간은 8.4h(각 Tasks의 수행 시간의 합)이고 Task 수행 수는 306으로, 6개의 Task에서 에러가 발생해 추가적으로 연산을 수행했습니다(Locality Level Summary: Process local: 306).

Shuffle Read Size가 250GB로 Partition 당 크기가 약 850MB이고, Partition 당 Spill (memory)은 약 2.5 GB ~ 4.5 GB로, 1 Core 당 2 GB 메모리의 자원으로는 작업 수행이 힘듭니다. Hadoop 클러스터의 사용량이 높다면 에러가 반복적으로 발생하고 Spark 앱이 종료될 수 있습니다.

 

 

실험 2: 파티션 수 6배 증대

Partition 당 크기가 140MB 정도로 설정이 되도록, 대략 840MB / 6 = 140MB로, 실험 1의 Partition 수의 6배인 1800으로 Partition 수를 설정했습니다.

(또는 전체 Shuffle Size / 140MB를 하면 (250.6GB + 23.6MB) / 140MB = 약 1800)

옵션
spark.sql.Shuffle.Partitions = 1800

 

결과

총 수행 시간은 5.5h로 Partition을 300으로 설정했을 때보다 2.9h가 줄어들었고, 총 1801개의 Task가 수행되었습니다(Task 하나에서 에러 발생).

전체 Spill은 약 221GB인데, 위의 표에서 보면 특정 Tasks(상위 25%, Max)에서 Spill이 일어났다는 것을 알 수 있습니다. 데이터가 특정 키를 기준으로 몰려있다면(skew), 이런 현상이 나타날 수 있습니다.

어떤 쿼리에서 spill이 생겼는지를 상세 확인하려면, Spark UI > SQL 탭에 들어가 보면 됩니다.

왼쪽 위 사각형에서 Join 하기 직전 Sort 부분에서 Spill이 일어난 것을 확인할 수 있습니다.

카카오 로그는 큰 편이고 특정 키를 기준으로 많은 로그 수가 존재하기 때문에, 쿼리 최적화를 해주어야 합니다.

 

실험 3: 쿼리 최적화

쿼리 최적화는 로그 데이터를 한번 집계를 한 뒤, 유저 데모 데이터와 Join을 하는 것으로 수정했습니다.

// Partition 수 설정spark.conf.set("spark.sql.shuffle.partitions", 1800) 

val data1 = spark.sql("select * from data1")
val data2 = spark.sql("select * from data2")
.groupBy("key", "action_name") // 조인 전 로그 수 집계
.agg(count(lit(1)).as("cnt")) 

val jExpr = data1.col("key") === data2.col("key")
val joinDF = data1.join(data2, jExpr)  

joinDF.groupBy("action_name", "info")
.agg(sum($"cnt").as("cnt"))
.show

결과

카카오 로그 집계 부분

유저 데모와 join 후 집계 부분

총 수행 시간은 3.3h(3.2h + 22min)으로, 쿼리 최적화를 하기 전보다 2.2h 줄어들었습니다. 

카카오 로그 집계 부분과 데모와 Join 후 집계 부분 둘 다 Spill이 일어나지 않고 정상적으로 동작했습니다.

여기에서 Shuffle Read Size와 Shuffle Write Size의 총합이 300GB가 되지 않는데, 약 600GB 이상의 총 메모리를 사용하는 것이 낭비일 수 있습니다.

그래서 다음의 실험 4에서 3 Cores X 3 GB 메모리 X 100 instances로 실험을 해보았습니다.

 

실험 4: 최적화 후 코어당 메모리 감소

위의 쿼리 최적화를 한다면 1 Core 당 1GB 메모리에서도 정상적으로 작동합니다.

 

최적화 실험 결론: 최적화 시 고려할 점

위에서 Partition에 대해서 설명했지만, 가장 중요한 최적화 부분은 코드(쿼리)입니다. 최적화의 우선순위는 쿼리 > Partition 수 > Core 당 메모리 증가입니다.

쿼리는 최대한 groupBy로 집계를 한 후 Join을 하고 그다음에 Partition 수를 조절한 다음, 그래도 안된다면 Core 당 메모리를 증가시켜야 합니다.

Partition 수를 증가시킨다면 Task 수도 늘어나서 실행 시간이 증가될 수 있지만, Shuffle Spill이 일어나지 않도록 한다면 시간이 더 감소됩니다. 따라서, Shuffle Spill이 일어나지 않게 하는 선인 Shuffle Partition의 크기를 100 ~ 200MB로 설정하는 것이 최적입니다.

단, 대부분의 데이터 처리에서 위의 설정이 적합하지만, Shuffle Size가 600GB에 가깝거나 그 이상일 경우에는 Core 당 메모리를 증가시키는 것을 권장합니다. 보통 Shuffle Size가 600GB 이상이 되면 1 코어당 4GB를 고려하는 것을 권장합니다. 

Cartesian join(cross join) 사용으로 Row 수가 급격하게 증가한 경우에도 Shuffle Size가 커지기 때문에 메모리 증가를 고려해야 합니다. 

이 외에도, Spark ML을 사용하거나 Caching을 하는 경우, Spark 메모리 구조 중 Storage Memory Fraction 부분에서 캐싱을 하게 되는데, 이렇게 되면 연산(Execution)을 해야 하는 부분이 줄어들어 결국에는 메모리를 증대해야 합니다.

참고

  • Storage 메모리: Spark의 Cache 데이터 저장을 위해 사용
  • Execution 메모리: Shuffle, Join, Sort, Aggregation 등의 연산 과정에서 임시 데이터 저장을 위해 사용

 

출처: https://www.tutorialdocs.com/article/spark-memory-management.html

 

각 실험별 정리

구분 실험1 실험2 실험3 실험4
메모리 / Core 수 6GB / 3cores 6GB / 3cores 6GB / 3cores 3GB / 3cores
쿼리 최적화 X X
Partition 수 300  1800  1800  1800 
수행시간  8.4h  5.5h  3.5h  3.7h 
Shuffle Size 250GB 250GB 250GB  250GB 
Shuffle Size / Partition 840MB  140MB  140MB  140MB 
Shuffle Spill  770GB 220GB 
에러 수 6 1

 

결론

 

  • Shuffle Spill이 일어난다면 에러가 발생해 작업이 지연될 수 있습니다. 그리고 Hadoop 클러스터가 busy 상태인 경우, 연달아 에러가 발생하고 강제 종료될 수 있습니다.
  • 메모리가 부족하다면, 우선적으로 Shuffle Partition 수를 고려해야 합니다.
  • Shuffle Partition의 크기를 100MB~200MB 사이로 나오도록 spark.sql.Shuffle.Partitions를 설정해야 합니다.

 

Latest Posts

[get Server!] [커머스CIC] 채널개발파트 소개 드려요!

평소 커머스 도메인에 관심이 많았다면? 톡딜을 통해 핫템을 득해본적이 있다면? 한번이라도 라이브커머스를 넋놓고 쳐다본적이 있다면? 라이언이랑 춘식이랑 함께하는 카카오 커머스CIC에서 개발자의 꿈을