AQE: Coalescing Post Shuffle Partitions

안녕하세요. 카카오 데이터PE셀(응용분석팀)의 Logan입니다. 

응용분석팀에서 식별키 성연령 개발을 담당하고 있습니다. 데이터 분석에 Spark를 메인으로 사용하고 있고, 모델링에는 Tensorflow를 주로 사용하고 있습니다.

지난 번 글인 “Spark Shuffle Partition과 최적화” 에서는 Spark Partition에 대한 개념을 소개하고, 최적화 실험을 통해 Spark Partition을 이해하는 방법에 대해 설명한 바 있습니다. 이번에는 동적 최적화 프레임워크인 Adaptive Query Execution(이하 AQE)에 대해 설명하고, 주로 지원하는 최적화 기법 중 Coalescing Post Shuffle Partitions(Partition 수를 줄이는 기능)에 대해 말씀 드리고자 합니다.

[참고] Spark Partition의 개념

Partition은 RDDs나 Dataset를 구성하고 있는 최소 단위 객체입니다. 각 Partition은 서로 다른 노드에서 분산 처리됩니다. Spark에서는 하나의 최소 연산을 Task라고 표현하는데, 이 하나의 Task에서 하나의 Partition이 처리됩니다. 또한, 하나의 Task는 하나의 Core가 연산 처리합니다.

즉, 1 Core = 1 Task = 1 Partition입니다. 

설정된 Partition 수에 따라 각 Partition의 크기가 결정됩니다. 그리고 이 Partition의 크기가 결국 Core 당 필요한 메모리 크기를 결정하게 됩니다.

◼️ Partition 수 → Core 수
◼️ Partition 크기 → 메모리 크기

따라서, Partition의 크기와 수가 Spark 성능에 큰 영향을 미치는데, 통상적으로는 Partition의 크기가 클수록 메모리가 더 필요하고, Partition의 수가 많을수록 Core가 더 필요합니다.

◼️ 적은 수의 Partition = 크기가 큰 Partition
◼️ 많은 수의 Partition = 크기가 작은 Partition


 

AQE의 등장

 

Spark 3.0 버전에서 동적으로 최적화해주는 프레임워크인 Adaptive Query Execution(AQE)가 처음 등장했습니다. 또한 Spark 3.2 버전부터는 AQE를 On하는 설정인 spark.sql.adaptive.enabled의 값이 디폴트로 true가 되었습니다.

기존의 Spark SQL의 쿼리 옵티마이저는 1.x대 버전에서는 룰 기반(Rule-Based Optimization)을, 2.x대 버전에서는 룰 기반 외에 코스트 기반을 포함해 최적화를 실행하게 되었습니다. 

코스트 기반 최적화(Cost-Based Optimization, CBO)의 경우, 올바른 Join 타입을 선택하거나 다자간 조인에서 Join의 순서를 조정하는 방식으로 최적화를 실행할 수 있습니다. 하지만, CBO에는 통계치 수집에 많은 비용이 들어가고, 통계치가 오래된 경우에는 예측이 부정확해져 최적화가 되지 않는 등의 한계가 있었습니다. 이러한 한계를 극복하고자 등장한 것이 AQE(Adaptive Query Execution)로, AQE는 런타임 시 발생하는 다양한 통계치를 수집해 성능 개선을 가능하게 합니다.

(출처: https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html)

AQE는 데이터의 row 수, distinct value의 수 와 같은 통계치를 가지고 데이터 작업 중(런타임)에 최적화를 해줍니다.

크게 2가지 최적화 기법이 있는데, partition의 수를 줄여주는 기능과 최적화된 Join을 적용하는 기능이 있습니다. 그중 많이 사용될 것이라고 생각되는 partition의 수를 줄여주는 기능Coalescing Post Shuffle Partitions에 대해 알아보겠습니다.

 

AQE이란?

AQE 최적화는 이름 그대로 shuffle이 끝난 다음 partition을 coalesce(병합)를 해주는 기능입니다. 너무 많은 partition은 많은 task가 필요하거나 I/O를 많이 유발할 수 있기 때문에 적절한 수가 필요한데, AQE 기능이 적절한 partition의 수를 정해 줍니다.

AQE 기능은, 설정에서 spark.sql.adaptive.enabled와 spark.sql.adaptive.coalescePartitions.enabled가 true 일 때 작동합니다. Spark 3.2 버전부터는 default 값이 true이므로 자동으로 적용이 됩니다.

그리고 AQE 기능은 기본적으로 coalesce를 실행하므로 충분히 많은 수의 partition을 설정해야 합니다. AQE의 Partition 수는 spark.sql.adaptive.coalescePartitions.initialPartitionNum으로 설정할 수 있습니다. 이 값이 설정되어 있지 않으면 spark.sql.shuffle.partitions 값을 따라가게 됩니다.

 

Coalescing Post Shuffle Partitions 사용하기


이제 Coalescing Post Shuffle Partitions에 대해 알아보겠습니다. 예제 코드는 다음과 같습니다.

 

예제 코드

데이터 B를 key와 code로 row 수를 집계한 다음 데이터 A와 조인을 해 code, info1, info2 기준으로 row 수를 집계하는 코드입니다.

 

실험 방법과 결과

spark.sql.shuffle.partitions을 10000으로 설정하고 spark.sql.adaptive.coalescePartitions.enabled를 true/false로 바꿔가면서 실험했습니다.

실험 결과는 다음과 같습니다.

[참고] Spark Partition의 종류

Spark Partition은 쓰이는 때에 따라 다음의 3가지로 구분할 수 있습니다.

◼️ Input Partition: 처음 파일을 읽을 때 생성하는 Partition
◼️ Output Partition: 파일을 저장할 때 생성하는 Partition
◼️ Shuffle Partition: Spark 성능에 가장 크게 영향을 미치는 Partition으로, Join, groupBy 등의 연산을 수행할 때 사용됨

이 중, Spark의 주요 연산이 Shuffle인 만큼, Shuffle Partition이 가장 중요합니다.

더 자세한 설명은 https://tech.kakao.com/2021/10/08/spark-shuffle-partition/를 참고해 주세요.

 

Input으로는 동일하게 각각 9943, 300개의 partition이 생성되었지만, shuffle이 일어나는 단계부터 partition의 수가 달라집니다.

첫 번째 shuffle에서는 10000 -> 5000, 두 번째 shuffle에서는 10000 -> 358, 마지막 shuffle에서는 10000 -> 118로 partition의 수가 줄어듭니다.

이 partition의 수는 설정값 spark.sql.adaptive.advisoryPartitionSizeInBytes의 크기가 결정하며, 기본값은 ’64m’ (64MB) 입니다. 이 크기에 가깝게 partition의 수가 정해집니다.

5000개의 partition이 생성되는 stage를 보면 위의 사진과 같이 평균적으로 60MB 근처에서 partition의 크기가 결정되는 것을 알 수 있습니다.

총 shuffle size를 64MB로 나누면 (262.7 + 21.7) * 1024 / 60 = 4853으로, 약 5000개가 나옵니다.

Spark UI의 SQL 탭에 들어가면 실행 과정을 UI로 확인할 수 있습니다. 세 번의 shuffle이 일어나는 부분으로 나누어서 살펴보겠습니다.

 

첫 번째 shuffle

처음 group by가 일어나는 부분(B의 group by)을 자세히 보면 기본적으로 HashAggregate → Exchange → HashAggregate 연산을 합니다.

첫 번째 HashAggregate에서 부분적으로 카운팅을 하는 partial_count(1)을 하고 다음 Exchange에서 실제로 shuffle이 일어나고, 다음 두 번째 HashAggregate에서 전체 카운팅을 하는 count(1)을 합니다.

여기에서 Exchange 바로 다음에 AQEShuffleRead가 추가되어 number of partitions가 10000에서 5000으로 줄어들었습니다. AQE가 partition의 크기를 64MB에 근접하도록 최적화한 결과입니다.

partition의 수가 5000으로 줄어든 다음 두 번째 HashAggregate에서 peak memory total도 634.8GB → 332.0GB로 줄어 들었다는 것도 확인할 수 있습니다.

 

두 번째 shuffle

두 번째 shuffle은 Join 전에 일어납니다. Exchange에서 shuffle이 일어나고 다음 AQEShuffleRead에서 partition의 수를 10000 → 358로 감소시킵니다.

여기에서 흥미로운 점은 앞 단계에서 partition의 수를 5000으로 줄였지만 다시 shuffle을 하면서 partition의 수가 10000이 되었다는 점입니다.

spark.sql.shuffle.partitions 의 값을 10000으로 설정했기 때문에 shuffle이 일어날 때마다 partition의 수가 10000이 됩니다.

연관된 설정으로 spark.sql.adaptive.coalescePartitions.initialPartitionNum가 있는데 default 값은 spark.sql.shuffle.partitions 값을 따라갑니다.

spark.sql.shuffle.partitions과 spark.sql.adaptive.coalescePartitions.initialPartitionNum가 동시에 설정되어 있다면 spark.sql.adaptive.coalescePartitions.initialPartitionNum가 우선적으로 적용됩니다.

추천 드리는 설정 방법은 spark.sql.shuffle.partitions를 충분히 큰 값으로 설정한 후에 한번 실행하고, shuffle partition의 수 중 가장 큰 값을 반올림한 값으로 다시 설정하시길 바랍니다.

 

세 번째 Shuffle

세 번째 shuffle은 C의 group by(partial_sum)가 일어난 후에 일어납니다. partition의 수가 10000 → 118로 줄어들었습니다.

그리고 이 118개의 partition이 output partition으로 이어집니다. 여기서도 한 가지 흥미로운 점은 partition 사이즈가 총 120MB 정도인데, partition의 수가 2개가 되어야 하는데 118개가 되었다는 점입니다. 이와 관련된 설정값으로 spark.sql.adaptive.coalescePartitions.parallelismFirst가 있습니다. default로 true를 가집니다.

이 설정은 true로 설정할 경우 병렬성을 우선적으로 고려해서 할당한 코어를 최대한 많이 사용하려고 합니다. 이 경우 spark.sql.adaptive.advisoryPartitionSizeInBytes로 설정되는 최종 partition의 크기는 무시되며, spark.sql.adaptive.coalescePartitions.minPartitionSize (default 1MB)로 최종 partition의 크기가 결정됩니다. 

위에서 dafault 값으로 1MB이므로 120MB/1MB인 약 120개(118개)의 partition이 생성됩니다.

default는 true이지만, Spark 공식 문서에서는 이 값을 false로 설정할 것을 권장하고 있습니다. false로 설정할 경우, 최종 partition의 크기 역시 spark.sql.adaptive.advisoryPartitionSizeInBytes가 되고, 다음과 같이 output으로 2개의 partition을 생성합니다.

 

정리

  • Coalescing Post Shuffle Partition은 shuffle 이후 coalesce를 자동으로 해준다.
  • 이 기능은 너무 작고 많은 partition(task)의 생성을 방지한다.
  • 잘 활용하려면 shuffle partition의 수를 처음에 충분히 큰 수를 설정해야 한다.

 

설정 Default 설명
spark.sql.adaptive.coalescePartitions.enabled true spark.sql.adaptive.enabled와 같이 true라면 shuffle 이후의 partition의 크기를 spark.sql.adaptive.advisoryPartitionSizeInBytes에 맞추도록 합니다. 너무 작고 많은 partition(task)의 생성을 방지합니다.
spark.sql.adaptive.coalescePartitions.parallelismFirst true true일 경우 병렬성을 최대화하기 위해 (코어를 더 많이 사용하기 위해) partition의 크기를 spark.sql.adaptive.advisoryPartitionSizeInBytes를 무시하고, spark.sql.adaptive.coalescePartitions.minPartitionSize를 우선적으로 고려합니다. Spark 공식 문서에서는 이 값을 false로 설정할 것을 권장합니다.
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB partition의 최소 크기를 지정합니다. 이 값은 spark.sql.adaptive.advisoryPartitionSizeInBytes의 최대 20%까지 지정할 수 있습니다. 이 값은 spark.sql.adaptive.advisoryPartitionSizeInBytes가 무시될 때 사용됩니다.
spark.sql.adaptive.coalescePartitions.initialPartitionNum (none) coalesce 전 shuffle partition의 수를 지정해 줍니다. 설정되어 있지 않으면 spark.sql.shuffle.partitions와 동일합니다.
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB partition을 이 크기에 가깝게 최적화합니다.

 

Reference
https://spark.apache.org/docs/latest/sql-performance-tuning.html#coalescing-post-shuffle-partitions

 


 

함께 하면 좋은 글 

Latest Posts