728x90

Intro

이전에 두번이나 spark에 대해서 글을 정리 했었는데,

[Apache Spark]Join Strategy(조인 수행 전략/방식)

[Apache Spark]Spark Partition에 대한 간단한 정리

 

해당 개념에 대해서 정리하다보니 spark shuffle에 대해 자주 언급이 되길래 개인적으로 이 개념에 대해서 내가 얼마나 이해를 하고 있는걸까? 라는 생각이 들면서 한번 정리할 필요성을 느꼈다.

 

어째 다시 back to basic 하는 느낌이 들기는 하지만, 정석대로 차차 공부해가기엔 너무나 공부할 양이 많은 spark에 대해서 이렇게 관련된 개념들에 대해서 추가적으로 정리해가면서 공부하는 것도 나쁘지는 않겠다는 생각도 들었다.

 

이번에는 spark의 shuffle에 대한 개념에 대해서 정리해보려 한다. 

비교적 덜 복잡한 개념이다보니 가볍게 읽고 알고 있던 개념을 한번 정리하기에 좋은 글이 될 거 같다.

 

Spark Shuffle이란?

shuffle에 대해서 이해하려 한다면 막 섞여있는 카드를 떠올리면된다.

같은 팩 내에 들어있는 카드를 순서대로 정리하려고 할때, 카드가 마구잡이로 섞여있다면 우리는 A부터 K까지 하나하나 순서를 정리하고 같은 그룹내의 카드끼리 또 묶어야 할 것이다.

이렇듯 카드를 적절한 순서로 정리하기 위해 카드를 이동시키는 과정이 Shuffle이다.

  • shuffle은 spark에서 데이터를 재분배하는 방법이며,
  • 효율적인 spark application을 개발하기 위해 상당히 중요한 개념이다.

Shuffle 을 이해하기 위해서는, reduceByKey 의 작동 방식을 알아야한다.

reduceByKey 는 동일한 Key 를 가지고 있는, 모든 record 값을 취합하는 작업이다.

(A, 1), (A, 2), (A, 3) → (A, 6)

하지만, Spark 의 분산처리는 파티션 단위로 진행되기 때문에, 동일한 Key 의 모든 record 값을 취합하기 위해선, 동일한 Key 를 가진 튜플 데이터가 전부 같은 파티션에 있어야한다.

따라서, 모든 튜플 데이터가 여러 클러스터에 분산 저장되어 있을 때, 동일한 Key 를 가진 튜플 데이터를 동일한 파티션에 두기 위해, 데이터의 위치를 재조정하는 방법이 Shuffle 이다.

 

한 가지 예시를 들어보면. 테이블에 전화 통화 기록 목록이 있고 매일 발생한 통화량을 계산한다고 가정 해보자.

“날짜”를 키로 설정하고 각 레코드에 대해 값으로 “1”을 지정한 다음, 각 키의 값을 합산하여 결과 값을 계산할 수 있을 것이다.

만일 데이터가 여러 클러스터에 저장되어 있다면 어떻게 해야 동일한 키의 값을 합산할 수 있을까? 이를 위한 유일한 방법은 같은 키의 모든 값을 동일한 시스템에 두는 것이다. 그런 다음 이 값들을 합치면 된다.

만일 데이터가 이미 키 값으로 파티셔닝 되어 있고 키 값에 대해 변화를 주고 싶다면, 좌측의 그림처럼 수행하게 된다.

filter(), sample(), map(), flatMap() 등의 transformation이 이에 해당하며, 이 경우 Shuffle이 필요 없습니다.

이를 Narrow Transformation 이라고 한다.

반면, 서로 다른 파티션으로부터 특정한 값을 기준으로 추출하고 싶은 경우, 그 값을 기준으로 Shuffle이 발생하게 된다.

groupByKey(), reduceByKey() 등이 이에 해당하며, 이를 Wide Transformation 이라고 한다.

 

두 개의 테이블을 Join 할 때에도 Shuffle 이 발생할 수 있다. 위의 예시 처럼 두 테이블에서 키 값을 기준으로 Join 하게 되면, 동일한 키를 가진 데이터가 동일한 파티션으로 이동한다.

하지만 이 때, 셔플 되는 데이터의 양이 성능에 영향을 미칠 수 있다. 만일 C의 데이터의 크기가 A보다 훨씬 크다면, C에 대한 작업으로 인해 전체의 수행시간이 오래 걸리게 될 것 이다.

 

Shuffle의 유형

Map-side 셔플

이것은 네트워크를 통해 데이터를 전송하기 전에 단일 노드 내에서 데이터를 셔플 하는 것을 포함합니다. 이는 특히 데이터가 이미 분할되어 있거나 데이터의 일부만 셔플해야 하는 경우 전체 셔플보다 더 효율적일 수 있다.

Reduce-side 셔플

네트워크를 통해 다른 노드로 데이터를 셔플한 다음 추가 처리를 위해 데이터를 다시 파티셔닝 하는 방식이다. 일반적으로 데이터가 너무 커서 단일 노드에 맞지 않거나 특정 방식으로 데이터를 그룹화하거나 집계해야 할 때 사용된다.

 

Shuffle 성능 최적화

[셔플이 발생되는 함수들]
- groupByKey: 이 연산은 RDD의 각 키에 대한 값을 그룹화하므로 셔플이 필요하다. 이 작업은 특히 키에 대한 데이터가 여러 파티션에 분산되어 있는 경우 비효율적일 수 있다.
- reduceByKey: 이 연산은 RDD의 각 키 값에 함수를 적용하며 각 키의 값을 결합하기 위해 셔플이 필요하다. groupByKey와 달리 reduceByKey는 데이터를 셔플 하기 전에 각 파티션에 로컬로 reduce 함수를 적용하므로 더 효율적일 수 있다.
- aggregateByKey: 이 연산은 RDD의 각 키 값에 집계 함수를 적용하고 각 키의 값을 결합하기 위해 셔플이 필요하다. reduceByKey와 마찬가지로 aggregateByKey는 데이터를 셔플하기 전에 각 파티션에 로컬로 집계 함수를 적용한다.
- sortByKey: 이 작업은 RDD의 요소를 키별로 정렬하는 작업으로, 데이터를 키별로 그룹화한 다음 정렬하기 위해 셔플이 필요하다.
- join: 이 작업은 키를 기준으로 두 개의 RDD를 조인하며, 각 키의 데이터를 한데 모으기 위해 셔플이 필요하다. 이 작업은 셔플 오버헤드 측면에서 가장 비용이 많이 드는 작업이다.
- cogroup: 이 작업은 키를 기준으로 여러 RDD의 값을 그룹화.
- distinct: 이 작업은 RDD에서 중복을 제거하며, 데이터를 키별로 그룹화한 다음 중복을 제거하기 위해 셔플이 필요하다.
- repartition: 이 작업은 RDD의 데이터를 지정된 수의 파티션으로 재분배하며, 이 작업에는 셔플이 필요할 수 있다.
- zip: 이 작업은 길이가 같은 두 개의 RDD에서 요소 쌍을 생성하며, 동일한 인덱스를 가진 요소에서 쌍이 생성되도록 셔플이 필요하다.
- coalesce: 이 작업은 RDD의 파티션 수를 줄이며, 파티션 수를 늘리는 경우 셔플이 필요할 수 있다.

셔플을 유발하는 연산을 염두에 두고 신중하게 사용하여 성능 오버헤드를 최소화하는 것이 중요하다.

셔플 성능을 최적화하는 한 가지 일반적인 접근 방식은 groupByKey 대신 reduceByKey 및 aggregateByKey와 같은 연산을 사용하여 보다 효율적인 셔플을 수행하는 것 이다.

또한 파티션 수, 셔플에 사용되는 메모리 양, 네트워크 대역폭을 적절히 구성하면 셔플 성능을 개선할 수 있다.

 

  • 구성 매개변수 조정: Apache Spark는 셔플 성능을 최적화하기 위해 조정할 수 있는 여러 구성 매개변수(예: 셔플에 사용되는 메모리, 리듀서 수, 사용되는 직렬화 형식. 이러한 매개변수를 조정하면 성능이 향상되고 셔플의 영향을 줄일 수 있다.
    • Partitioning: 파티션 수는 셔플 성능에 영향을 미칠 수 있다. 데이터가 잘 분할되지 않으면 셔플링이 효율적으로 발생하지 않을 수 있다. 데이터 크기, 클러스터 구성 및 사용 가능한 메모리 양을 기반으로 파티션 수를 신중하게 선택하는 것이 중요하다.
    • Memory: 사용 가능한 메모리 양은 또한 임팩트 셔플 성능. Spark는 사용 가능한 메모리를 활용하여 디스크 I/O 양을 줄일 수 있는 메모리 기반 셔플 알고리즘을 사용한다. 따라서 셔플에 사용할 수 있는 메모리 양을 늘리면 성능이 향상될 수 있다.
    • Serialization: 직렬화는 데이터를 네트워크를 통해 전송할 수 있는 형식으로 변환하는 프로세스이다. 올바른 직렬화 형식을 선택하면 셔플 성능에 영향을 미칠 수 있다.
    • Network: 네트워크 대역폭과 대기 시간은 셔플 작업의 성능에 영향을 미칠 수 있다. 셔플 되는 데이터의 양을 처리할 수 있도록 네트워크를 적절하게 구성하는 것이 중요하다.
    • Configuration: Spark에는 셔플 성능을 최적화하기 위해 조정할 수 있는 여러 구성 매개변수가 있다. 예를 들어 spark.shuffle.compress를 true로 설정하면 셔플 단계에서 데이터를 압축하여 네트워크 I/O의 양을 줄일 수 있다.
  • 데이터 크기 줄이기: 셔플이 성능 문제를 일으키는 경우 데이터 양을 줄이는 방법을 모색할 수 있다. 한 가지 접근 방식은 필요한 데이터만 처리되도록 섞기 전에 데이터를 필터링하는 것이다. 또 다른 접근 방식은 섞기 전에 데이터를 미리 집계하여 섞을 데이터를 줄이는 것 이다.
  • 데이터 다시 분할: 경우에 따라 데이터를 다시 분할하면 셔플 성능이 향상된다. 여기에는 데이터 크기 및 클러스터 구성과 일치하도록 파티션 수를 늘리거나 줄이는 작업이 포함된다. 데이터 크기와 사용 가능한 리소스를 기반으로 최적의 파티션 수를 선택하는 것이 중요하다.
  • 적절한 데이터 구조 사용: 적절한 데이터 구조를 사용하면 데이터 양을 줄이는 데 도움이 될 수 있다. 예를 들어 브로드캐스트 변수를 사용하면 노드 간에 전송해야 하는 데이터의 양을 줄이는 데 도움이 될 수 있다.
  • 적절한 API 사용: Apache Spark는 다음과 같이 설계된 여러 API를 제공한다. 

 

Shuffle과 Partition 설정

관련 설정 : spark.sql.shuffle.partitions

이는 shuffle에 대해 가장 비용이 많이 드는 작업인 join 작업 시에 중요한 설정이다.

 

Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어나게 된다. Shuffle Spill이 일어나면, Task가 지연되고 에러가 발생할 수 있다.

 

Memory Limit Over와 같이, Shuffle Spill도 메모리 부족으로 나타나는데, 보통 이에 대한 대응을 Core 당 메모리를 늘리는 것으로 해결한다. 하지만, 모든 사람이 메모리가 부족하다고 메모리 할당량을 늘린다면, 클러스터가 사용성이 더 떨어지고 작업이 더욱더 실패하게 될 것이다.

그래서 Partition의 크기를 결정하는 옵션인 spark.sql.shuffle.partitions를 우선적으로 고려해 설정해야 한다고 생각한다.

 

하지만 무엇보다 가장 중요한 최적화 부분은 코드(쿼리)이다. 

즉 최적화의 우선순위는 쿼리 > Partition 수 > Core 당 메모리 증가이다.

 

쿼리는 최대한 groupBy로 집계를 한 후 Join을 하고 그다음에 Partition 수를 조절한 다음, 그래도 안된다면 Core 당 메모리를 증가시켜야 한다.

 

Partition 수를 증가시킨다면 Task 수도 늘어나서 실행 시간이 증가될 수 있지만, Shuffle Spill이 일어나지 않도록 한다면 시간이 더 감소된다. 따라서, Shuffle Spill이 일어나지 않게 하는 선인 Shuffle Partition의 크기를 100 ~ 200MB로 설정하는 것이 최적이다.

 

Spark Shuffle Properties

  • spark.shuffle.compress: 엔진이 shuffle 출력을 압축할지 여부를 지정
  • spark.shuffle.spill.compress: 중간 shuffle spill 파일을 압축할지 여부를 지정

Shuffle에는 위의 두 가지 중요한 Spark Property 가 있다. 둘 다 기본적으로 값이 “true”이며, spark.io.compression.codec 압축 코덱을 기본으로한다.

 

이외에도 Spark Shuffle 관련된 Property는 아래의 공식문서에서 확인할 수 있다. https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior

 

outro

결국 shuffle은 파티션을 기반으로 분산작업이 수행되는 분산시스템인 spark에서 있어서 필수불가결한 작업인 것이다.

shuffle에 대한 이해를 바탕으로 이와 밀접한 관련이 있는 파티션과, 메모리 사용에 대해 이해하고 제대로 설정을 해야 spark가 가지는 분산 시스템의 장점을 적극 활용할 수 있을것 같다.

 

Reference

[Apache Spark] 스파크의 셔플(Shuffle)에 대하여

Spark Shuffle Partition과 최적화

[Spark] Shuffle 이란?

Spark의 Shuffling 이해하기

Spark Performance Optimization Series: #3. Shuffle

Spark Optimization : Reducing Shuffle

728x90

+ Recent posts