Intro
지난번에는 spark join에 대해서 정리를 해보았는데, 공부할 때마다 은근 많이 언급되는 개념이 spark partition이었다.
partiton/partitioning에 대해서 대략적으로는 어떤 느낌인지 이해는 가지만 명확하게 어떤 개념인지 설명하기가 어렵다는 생각이 들어서, 이번에는 spark partition에 대해서 간단하게 정리를 하고 넘어가려 한다.
사실 spark partition에 대해서도 깊이 파고들면 끊임없이 공부할 부분이 많은 개념이다보니 이번에는 partition의 개념과 종류에 대해서, 그리고 그 각 파티션에 대한 간단한 개념만 짚고 넘어가려한다.
Spark Component
spark partition에 대해서 이해하고자 할때 먼저 이해하고 넘어가면 좋은 부분이 바로 spark의 구성에 대한 개념이다.
spark는 대표적인 분산 시스템 중 하나이며 cluster라는 일종의 하나의 컴퓨터가 있다면
아래와 같은 구조로 각각의 resource(memory, storage)를 지닌 node가 있다.
정리하면 cluster > node > core > executor > task = partition 으로 세부적으로 나뉘어서 각 작업이 실행된다.
여기서 알 수 있듯이 partition은 하나의 큰 데이터가 분산되어 작업이 실행되는 가장 작은 단위인 것이다.
spark의 component에 대해서는 하단의 공식문서 링크를 참고하면 금방 이해할 수 있을 것이다.
partition을 이해하기 전에 기본적으로 Node, task 등의 내부 구조에 대해 조금은 짚고 넘어가야 이해가 쉬울 것이라 생각해서 간단히만 정리해보았다.
Spark Partition이란
위에서 한번 언급한대로 partition은 하나의 큰 데이터를 다루는 RDD나 Dataset을 구성하고 있는 최소단위 객체이다.
어떻게 보면 분산 시스템인 spark의 전반적인 성능과 리소스 점유량을 좌우할 수 있는 가장 기본적이고 중요한 개념이다.
partition이 가장 최소 작업 단위라면, partitioning은 이러한 최소 작업 단위를 쪼개기 위한 작업로직, 매커니즘을 의미한다.
spark의 구조에서 알수있듯이 spark 내에서 가장 최소의 연산이 task이며, 이 하나의 task에서 하나의 partition을 처리한다.
따라서 1 task = 1 partition인 것이다.
그리고 이 task가 하나의 core에서 처리되는 것이다.
이 말인 즉슨, partition이 많다 = task가 많다. core수가 많이 필요함을 의미하며/
반대로 partition이 크다 = 하나의 task와 core에서 이 partition내의 데이터를 처리하기 위한 리소스가 많이 필요하다 = 메모리크기가 커야함을 의미한다.
따라서 병렬화의 정도를 늘리고 메모리를 적게 사용하려면 partition의 수를 늘리면 되는 것이다.
그러면 메모리등의 리소스를 최소한으로 사용하도록 partition의 수를 최대한으로 설정해주는 것이 좋을까?
이 또한 아니다.
partition의 수를 너무 많이 설정했다면 과도한 리소스 사용을 일으키게 된다.
너무 많은 partition들이 생성되어 너무 많은 task에서 연산이 실행되는데, 실제로 해당 spark의 각 core와 task가 가진 메모리에서는 더 많은 데이터를 처리할 수 있는데 충분히 활용하지 못하는 반면 너무 많은 task가 실행되게 때문이다.
이렇듯, spark partitioning은 정해진 리소스 내에서 데이터를 효율적으로 쪼개서, 동시에 여러 파티션에서 데이터 작업을 병렬적으로 수행하여 작업이 좀 더 빠르게 완료되도록 해준다.
기본적으로 spark의 partition은 작업이 이루어지는 클러스터가 지닌 core수와 동일하게 생성된다.
즉 2개의 cluster > 각 클러스터는 node를 3개씩 가지고 있고 > 각 node가 4개의 core를 가지고 있다면, 기본적으로 2 * 3 * 4 = 24개의 core를 가지고 있고 partition 또한 24개가 생성된다.
그렇다고 core수에 따라서 partition 수가 고정되어있고, 이에 따라 Partition의 수를 조정하려면 core수 자체를 조정해야 하는 것은 아니다. 특정 설정값을 통해서 partition의 수를 조정할 수 있다.(repartition, coalesce)
최적화된 partition 수는 데이터의 크기에 따라 다르며, 클러스터 내의 노드의 수, 그리고 각 노드에서 사용할 수 있는 리소스에 따라 다르다.
그리고 각 partition에 작업의 최소 연산 단위인 task가 생성된다. 이렇게 쪼개진 각 partition들은 독립적으로 작업이 이루어진다.
Spark Partitioning의 종류
그렇다면 spark에 partition을 생성하는 partitioning은 어떤 방식으로 이루어질까?
partitioning은 크게 두가지 방식이 있다.
- hash partitioning
- 해당 partitioning에서는 rdd의 각 키에 해시코드를 할당하고, 각 partition이 해당 해시코드를 기반으로 생성되도록 한다.
- 따라서 모든 partition들 사이에서 균등한 분산을 보장한다.
- range partitioning
- 해당 partitioning에서는 데이터가 각 값의 범위에 기반한다. 예를 들어 특정 데이터가 일련의 숫자들이라면 spark는 각 숫자들을 범주화하고 각 범위에 기반해서 데이터를 파티셔닝한다.
기본적으로 spark는 hash partitioning을 사용한다.
Spark Partition의 종류
partiton이 쓰이는 때, 즉 spark에서 실행하고자 하는 작업의 종류에 따라 partition을 구분할 수 있다.
input / output / shuffle partition이며, 각 partiton에 대해서 간단하게 정리해보았다.
Input Partition
spark.conf.set("spark.sql.files.maxPartitionBytes",bytes)
- 처음 파일을 읽을때 생성하는 partition이다.
- 하나의 파일(데이터)을 읽을때 각 partition이 해당 데이터에서 어느 정도의 크기(byte)를 담당해서 작업하게 할 것인지를 지정해주는 것이다. 즉 크기를 키울수록 적은 partition을 쓰게 될 것이다.
- 기본값은 134217728(128MB)
- hdfs 상의 마지막 경로에 존재하는 파일의 크기가 128mb보다 크다면 스파크에서 128mb만큼 쪼개면서 파일을 읽는다
- 파일의 크기가 작다면 그대로 읽어들여 파일 하나당 파티션 하나가 된다.
- file-based source인 json, parquet, orc 같은 것에만 적용된다.
- 대부분의 경우, 필요한 칼럼만 골라서 뽑아 쓰기 때문에 파일이 128MB보다 작다. 가끔씩 큰 파일을 다룰 경우에는 이 설정값을 조절한다.
Output Partition
coalesce(cnt) vs repartition(cnt)
- Output Partition은 파일을 저장할 때 생성하는 Partition이다.
- 이 Partition의 수가 HDFS 상의 마지막 경로의 파일 수를 지정한다.
- 즉 partiton의 수를 많이 설정했다면, 이는 각 partition에서 담당하는 파일의 크기가 적다는 의미이다.
- 기본적으로, HDFS는 큰 파일을 다루도록 설계되어 있어, 크기가 큰 파일로 저장하는 것이 좋다.
- 보통 HDFS Blocksize에 맞게 설정하면 되는데, 일반적으로 Hadoop 클러스터의 HDFS Blocksize는 268435456 (256MB)로 설정되어 있어서, 통상적으로 파일 하나의 크기를 256MB에 맞도록 Partition의 수를 설정하면 된다.
- coalesce는 파티션을 줄일 때 사용하고, repartition()은 파티션 수를 늘리거나 줄일 때 사용한다.
- Partition의 수는 df.repartition(cnt), df.coalesce(cnt)를 통해 설정한다.
- repartition은 각 노드들 간에 데이터를 shuffle 하게 되는데, 이는 파티션의 수를 재생성하기 위해서이다.
- 반면에 coalesce는 파티션의 수를 줄이며 이는 각 노드간이 아니라 내에서 데이터를 shuffling 함으로써 이루어진다.
여기서 언급된 데이터 shuffle의 개념에 대해서 간단하게 정리해보려 한다.
spark에서 어떤 작업을 하든 그리고 이에 대해 공부를 한다면 가장 많이 언급되는 개념 중 하나가 shuffle인듯 하다.
spark shuffle의 가장 기본적인 개념은 하나의 파티션에서 다른 파티션으로 데이터를 이동시키는 것이다.
이에 따라 partition을 생성 혹은 재생성 해주는 partitioning은 설정에 따라서 단순히 partition간이 아니라 node간의 데이터 이동을 발생시키는 매우 비싼 작업이다.
기본적으로 df shuffle은 200개의 파티션을 생성한다.
즉 위에서 언급된 repartition과 coalesce 둘다 파일을 내보내서 저장할 때 partition의 수를 재설정해주는 함수인데,
repartition의 경우는 node들 간의 shuffle을 발생시키고, coalesce는 기존의 partition의 수를 줄이기 위해서 node 내에서 각 partition 간의 shuffle을 발생시킨다는 차이점이 있는 것 같다.
이는 추후에 한번 더 자세하게 다뤄볼 예정이다
Shuffle Partition
spark.conf.set("spark.sql.shuffle.partitions",num of partition)
- spark.sql.shuffle.partitions 옵션은 join,groupBy 혹은 aggregation 같은 연산을 할 시 data shuffling되는 파티션 수를 나타낸다.
- 파티션의 개수는 default로는 200으로 지정 되어 있으며, spark.conf.set("spark.sql.shuffle.partitions", 1800)와 같은 형태로 조정이 가능하다.
- Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어난다.
- 이때 Task가 지연되고 에러가 발생 -> 하둡클러스터에 에러가 나고 spark 강제종료되게 된다.
- Memory Limit Over와 같이, Shuffle Spill도 메모리 부족으로 나타나는데, 보통 이에 대한 대응을 Core 당 메모리를 늘리는 것으로 해결
- 일반적으로, 하나의 Shuffle Partition 크기가 100~200MB정도 나올 수 있도록 수를 조절하는 것이 최적
- 클러스터의 익스큐터 수보다 파티션의 수를 더 크게 지정하는 것이 대체로 좋다. executor num > partition num
- 로컬 머신에서 처리할 경우 병렬로 처리할 수 있는 task가 제한적이므로 이 값을 작게 설정
shuffle partition은 무엇보다 이전에 정리한 join과도 밀접하게 연관된 개념이라고 볼 수 있다.
각 partition의 종류에 대해 정리해보니, 결론적으로 spark로 데이터를 다룰 때 크게 세번의 partitioning을 해야하는 것 같다.
우선 파일을 읽을 때, 정해진 파일의 크기를 적절하게 분산작업할 수 있도록, 각 분산 작업이 담당할 파일의 크기를 지정해주고,
작업 도중 join, group by 등의 복잡한 연산을 할 시에, 주어진 리소스 내에서 메모리와 core를 최대한 잘 활용할 수 있도록 partition의 수를 재설정 해줘야한다.
그리고 이렇게 변환 작업된 파일을 저장하여 내보낼 때 다시 partition 수를 설정해주어야 하는데, 이땐 변환 작업 완료된 파일의 크기를 잘 파악해서 각 partition이 효율적으로 담당할 수 있는 파일 크기와 partition 수를 잘 지정해주어야 한다. 즉 무작정 partition의 수를 많게 한다고 해서 효율적인 것은 아니다.
Outro
이번에 글또 활동을 하며 공부하고 정리해볼 큰 주제를 spark로 잡았다고 했을 때 주변에서 공부할게 엄청 많을거라고 하더니,
정말 하나의 주제를 잡고 공부하다보면 꼬리에 꼬리를 물고 새로운 주제가 나오는 것 같다.
그 주제들을 다 하나로 묶어 정리하려 하면 밑도 끝도 없을거 같아서 이번처럼 적당히 끊고 다음에 더 자세하게 다뤄보는 식으로 진행해보려 한다.
이런 의미에서 다음에는 shuffle 혹은 repartitioning에 대해서 정리해보려 한다.
아무래도 저번에 정리한 join과도 관련되며 이번에 정리한 partition을 더 깊게 이해하고 활용하는데 있어서 위의 두 개념 중 하나를 이해하는 것이 중요하다는 생각이 들었기 때문이다.
Reference
'DE > Skill' 카테고리의 다른 글
[Apache Spark]Join Strategy(조인 수행 전략/방식) (0) | 2023.02.25 |
---|---|
Apache Sqoop의 등장과 은퇴(feat. Apache Sqoop projects moved to the Apache Attic) (0) | 2022.08.01 |
[Kafka] Kafka 기본 개념 정리 (0) | 2022.03.24 |
[Spark] Spark 기본개념 정리 (0) | 2022.03.24 |
[Database] apache cassandra (0) | 2022.03.23 |