728x90

Intro

회사에서 작년 말부터 업무적으로 spark를 많이 사용하게 되면서, 기존에 대략적으로만 이해하고 있던 spark에 대해서 이제 본격적으로 공부할 때가 되었다고 생각했다.

spark 공부의 정수인 [spark 완벽가이드]를 읽으면서 업무를 하던 와중에, spark join을 유독 많이 사용하게 되었고 이에 대해서 좀 더 자세하게 이해할 필요가 있었다.

따라서 spark 중에서도 join, 그리고 이 join에는 어떤 전략들이 존재하는지를 공부해보았다.

 

일단 본인이 spark로 업무를 하면서 대부분의 작업들이 join인데, 무작위로 작업하면서 OOM에러도 많이 발생시키고

심지어 cluster들을 수없이 죽여서 cluster 살인마라는 별명까지 얻기에 이르른...😂

 

따라서 이번글에서는 spark에서 join을 하기 위해 어떤 방식들을 선택할 수 있는지 각 방식들에 대한 설명을 정리해보았다.

 

이번 글은 spark에 대해 엄청 어려운 개념을 설명한 글은 아니지만, 그렇다고 완전 기초적인 글도 아니므로,

읽기전에 어느정도 spark에 대한 기초지식이 필요할 것 같다.

 

Spark Join

간단하게 spark join에 대해서 이야기해보자면 왼쪽과 오른쪽에 있는 각 데이터셋에 있는 하나 이상의 키값을 비교하는 것이고, 이를 통해 두 데이터 셋을 하나로 합쳐서 새로운 데이터셋을 만드는 것이다.

 

spark에서 join 작업을 하기 위해서는 사전에 크게 두가지를 고민하고 결정해야 한다.

첫번째로는 join type을 결정해야 한다. 이는 조인 이후 결과 데이터 셋에 어떤 데이터가 있어야 하는지를 결정하는 것이다.

예를들어 inner join으로 두 데이터 셋에 키가 있는 로우만 유지할지, left (outer) join으로 왼쪽 데이터 셋의 로우는 모두 유지하고 오른쪽 데이터 셋은 키가 있는 로우만 유지할지 등을 고민해야한다.

 

조인타입에는 크게 8가지가 있다.

-> inner join/ outer join/ left (outer) join / right (outer) join/ left semi join/ left anti join / natural join / cross join

 

조인 타입에서 대해서는 데이터를 다루기 위해 sql 쿼리를 공부할 때도 많이들 공부하는 부분이다보니 더 자세히 설명은 하지 않을 예정이다.

 

두번째로 고민해야 하는 것이 조인을 수행하는 방식이다. 

이는 말그대로 두 데이터셋을 조인할 때 어떠한 네트워크 통신 전략을 쓰며, 노드별로 어떠한 연산 작업을 통해서 작업자가 원하는 조인타입대로 데이터 셋을 구현하는 지에 대해 고민하고 결정하는 것이다. 

 

Spark Join Strategy

spark에서 join은 서로 다른 두 데이터셋에 대해 조건으로 건 키값을 기준으로 조회하고 합치는 작업을 진행하며, 실제로 이 작업들이 partition 단위로 분산되어 있는 노드간의 통신을 통해 이루어 진다. 

이 말은 즉슨, spark에서 join은 매우 값비싼 작업이라는 것이다.

 

join이 수행되는 방식을 어떻게 결정하느냐에 따라서 주어진 메모리 내에서 빠르게 효율적으로 작업이 수행될 수도 있는 반면에, 

계속해서 OOM에러를 발생시키고 더 나아가서 클러스터를 괴롭히며 죽여버릴 수도 있게 되는 것이다.(마치 내가 저지른 것처럼...)

 

join이 수행되는 방식을 결정하기 위해서 세부적으로 고민해야 하는 것이 두가지로 또 나눌 수 있다.

  1. 노드간 네트워크 통신전략 : join 과정에서 두 데이터셋 간의 데이터를 어떻게 이동시킬것이냐?
    • broadcast join : 노드간 통신을 유발하지 않음
    • shuffle join : 전체 노드간 통신을 유발
  2. 노드별 연산 전략
    • Hash, Sort Merge , Nested Loop 등

Broadcast Join

작은 사이즈의 테이블을 driver을 거쳐 각 worker node에 copy하여 뿌리는 방식이다. 즉 작업이 진행될 워커노드에 join 해야하는 데이터셋중 하나를 미리 이동시켜 놓는 것이다.

  • 테이블이 단일 워커 노드의 메모리 크기에 적합할 정도로 충분히 작은 경우, 조인 연산 최적화가 가능하다.(default 10M)
  • 결국 작은 데이터 셋의 카피본이 네트워크를 통해 각 워커 노드에 이동되는 것이므로, 각 데이터의 사이즈가 충분히 작아야 한다. 이는 각 워커 노드에 대해 설정해둔 메모리 사이즈 기준이다.
  • 결국 노드간 통신이 발생하는 것 아닌가? 할 수 있지만 복제당시 초기에만 대규모 통신이 발생하고, 이후에 조인 프로세스 내내 노드간 통신이 이루어지는 현상이 발생하지 않는다. 
  • 즉 조인 시작 시, 단 한번의 복제가 수행되고 그 이후로는 개별 워커가 다른 워커를 기다리거나 통신할 필요가 없다. 
  • 큰 데이터셋과 작은 데이터셋을 조인할때 유용하며, 이는 map-sied join으로도 알려져 있다.
map-side join

start schema 형태에서 큰 테이블(fact table)과 비교적 작은 테이블(dimension table)을 조인할때 큰 테이블의 데이터를 네트워크를 통해 모두 전달할 필요가 없도록 해주는 방식이 map-side join이다. 
하둡 커뮤니티에서 map-side join이라고 하며 다른 분산시스템에서는 broadcast join이라고 불리운다. 

Shuffle Join

  • 전체 노드간 통신이 발생한다.
  • 네트워크가 복잡해지고 많은 자원을 사용한다. 
  • 전체 조인 프로세스가 진행되는 동안 모든 워커 노드에서 통신이 발생한다.

Hash Join

Hash Join은 작은 데이터 셋의 키를 조회하면서 자주 접근될 메모리에 Hash 셋을 만들고, 큰 테이블을 순회하면서 Hash 셋을 조회하면서 Join하는 연산 방식이다. 

Broadcast Hash Join

Broadcast Hash Join은 작은 데이터셋을 Broadcast 한 후에 Executor에서 Hash Join을 수행

  • Broadcast Hash Join은 사용자가 Hint를 지정하거나, 지정하지 않았더라도 한쪽 테이블 사이즈가 spark.sql.autoBroadcastJoinThreshold = 10 MiB (Default) 보다 작으면 실행
  • Broadcast될 대상 테이블이 클 경우 Driver를 거쳐서 Broadcasting 이 발생하므로 Driver OOM 또는 해시 테이블로 인한 Executor OOM이 발생할 수 있음
  • 모든 단일 노드에서 개별적으로 join이 수행되므로, CPU가 큰 병목 구간이 될 수있다.
  • '=' join만 지원한다.
  • full outer join 외에 모든 조인 타입을 지원한다.

Shuffle Hash Join

Shuffle Hash Join은 Shuffle을 발생시켜 데이터를 이동한 뒤 Hash Join을 수행

  • 작은 쪽, 즉 메모리에 Hash 셋을 만들 테이블은 spark.sql.autoBroadcastJoinThreshold (10MiB, Default) * spark.sql.shuffle.partitions (200, Default) = 2GiB 보다 작아야 함
    • Spark 2.3부터 Shuffle Hash Join 대신 Shuffle Sort Merge Join이 기본 전략으로 세팅됨(spark.sql.join.preferSortMergeJoin = true, Default)
  • Shuffle Hash Join은 큰 데이터셋을 다룰 수 있으나 Hash 셋을 빌드할 때 메모리에 올려야 하므로 너무 크다면 Executor OOM이 발생할 수 있음
  • '=' join만 지원한다
  • join key가 정렬되어야 할 필요는 없다
  • full outer join 제외하고 모든 조인 타입을 지원한다. 
  • 해당 조인 방식이 매우 expensive한 방식이라는 의견이 있다. 셔플 뿐 아니라 hashing 방식 과정에서 많은 통신과 메모리를 필요로 하기 때문이다. 특히 hash table을 유지하기 위해서도 메모리와 computation을 많이 요구하게 된다. 

(shuffle) Sort Merge Join

모든 노드 간의 all-to-all communication 방식

  • 먼저 실제 join 작업을 수행하기 전에 파티션들을 정렬한다. (이 작업만으로도 비용이 크다)
  • 정렬된 데이터들을 병합하면서 join key가 같은 row들을 join한다.
  • Sort Merge Join은 Shuffle Hash Join과 비교할 때, 클러스터 내 데이터 이동이 더 적은 경향이 있다.
    • hash set을 만드는 작업이 이루어지지 않기 때문이라고 추정한다.
  • Join될 파티션들이 최대한 같은 곳에 위치해야 한다. 그렇지 않으면 파티션들을 이동시키기 위해 대량의 shuffle이 발생한다.
  • DataFrame의 데이터가 클러스터에 균등하게 분배되어 있어야 한다. 그렇지 않으면 특정 노드에 부하가 집중되고 연산 속도가 느려진다.
  • Spark 2.3부터 spark.sql.join.preferSortMergeJoin = true가 디폴트로 활성화 되어있어, 큰 데이터셋에 대해 주로 사용
  • Join Key가 정렬 가능해야함
  • Join 과정에서 Shuffle Hash Join과 달리 Memory가 아닌 Disk를 이용할 수 있기 때문에 OOM이 발생하지 않음
  • 모든 join 타입을 지원한다.

Broadcast Nested Loop Join

Broadcast Nested Loop Join은 선택할 수 있는 전략이 없을 경우 마지막으로 선택되는 Join 전략이다.

작은 데이터셋을 Broadcast한 후 이중 반복문을 돌며 하나씩 데이터를 비교해 Join하는 방식이다.

어떻게 Join 수행 방식을 결정하면 좋을까?

  • broadcast join : 한 쪽의 데이터셋이 충분히 작은 경우 해당 방식을 선택하는 것이 좋다
  • shuffle hash join : broadcast를 하기에는 데이터의 양이 많으나, hash map을 짜기에 한쪽 데이터양이 작은 경우
  • sort-merge join : join key가 정렬가능할 때
  • broadcast nested loop join : 이는 가장 최종적으로 고려하는 방식이다. 이는 OOM이 발생할 수 있다.

join strategy 설정방법

아래의 힌트를 spark config에 설정하면 원하는 방식의 join를 적용해서 사용할 수 있다.

  • BROADCAST
    • 힌트 사용시 autoBroadcastJoinThreshold 옵션을 무시하고 동작
  • MERGE (SHUFFLE_MERGE 과 동일)
    • Sort-mege 조인을 사용
  • SHUFFLE_HASH
  • SHUFFLE_REPLICATE_NL : nested loop join을 사용하기 위한 힌트이다.

만약 JOIN 대상 양쪽 테이블에 Join 을 위한 다른 Hint 가 각각 지정된다면 Spark 는 다음의 우선순위를 이용해 결정한다.

  • BROADCAST > MERGE > SHUFFLE_HASH > SHUFFLE_REPLICATE_NL
  • 양쪽 테이블에 모두 똑같은 Join Hint 가 지정된다면 (Broadcast 및 Shuffle Hash) Spark 는 테이블의 사이즈를 고려해 작은쪽을 기반 테이블로 사용한다.

효율적인 join을 방해하는 것

  • Data Skewness : join key가 클러스터에 균일하게 분포해 있지 않으면 특정 파티션이 매우 커질 수 있다. 이는 Spark이 parallel하게 연산을 수행하는 것을 방해한다.
  • All-to-all communication : broadcast join이 아닐 경우, 두 DF의 데이터 모두에서 대규모 shuffle이 발생한다.
  • Limited executor memory

Data Skewness를 해결하려면?

  • Repartitioning : 단순히 repartition을 수행하는 것으로 데이터를 파티션들에 더 골고루 분배할 수 있다.
  • Key Salting : 근본적으로 파티셔닝되는 칼럼 키값에 salting을 적용하여 키가 고르게 분배될 수 있도록 할 수 있다.
Key Salting이란

한쪽으로 키값이 몰려있는 데이터를 random int 값을 데이터 뒤에 붙여서 키값 자체를 재분배하는 기술이다. 

Outtro

처음 spark join 작업을 할때는 결과물만 고려해서 join type에 대해서만 열심히 고민했었다.

그러나 실제로 spark를 사용하는 목적자체가 좀 더 효율적으로 큰 데이터를 다루기 위함이고, 이러한 목적에 맞게 spark를 잘 사용하기 위해서는 수행방식에 대해서도 잘 알고 고민해야겠다고 느꼈다.

 

spark join에 대해서 공부하다보니, 관련해서 partition에 대해서도 많이 언급되고 있어서 다음 글로는 spark partition에 대해서 좀 더 공부를 해볼까 하는 생각이 들었다.

그러나 무엇보다도 기본적으로 spark와 관련된 기본적인 개념을 조금은 알고 있어야 관련해서 필요한 여러 개념들을 더 깊이 이해할 수 있겠구나라는 생각이 들어서 spark 완벽 가이드를 빨리 완독해야겠다는 생각도 들었다..!

 

Reference

spark join에 대한 정리

spark 공식문서

효율적인 spark join 전략

spark join strategy - how & what

Map-Side Join in Spark

728x90

+ Recent posts