[Hadoop_기초] Udemy course - 섹션 2.MapReduce
MapReduce는 HDFS, YARN과 함께 제공되는 Hadoop의 핵심 기술이다.
1. Hadoop에 내재된 기능으로, 클러스터에 데이터의 처리를 분배해주는 역할
-> 내가 정의한 mapper/reducer 함수에 의해 mapped(transformed)되고, reduced(aggregated) 된 파티션으로 데이터를 분배하는데 이러한 과정이 클러스터에 걸쳐 병렬 처리된다.
2. 이러한 과정을 전반적으로 관리하고 실패가 생겼을 때 감당하는 역할
맵리듀스가 하는 일
MapReduce는 기본적으로 두 가지 일을 한다.
데이터를 매핑하고 리듀싱한다.
1. Mapping(매핑)
데이터가 한 줄씩 들어오면 매퍼는 그 데이터를 변형시킨다.
들어오는 데이터에서 필요한 정보를 추출하고 이해할 수 있는 구조로 정리하는 것이다.
모든 입력 줄(input line)마다 매퍼는 중요한 데이터를 추출하고 구조화한 후 데이터를 출력한다.
2. Reducer(리듀서)
리듀서는 데이터를 집계한다.
데이터를 매핑할 때는 데이터를 집계하고자 하는 중요한 것, 곧 특정한 키-값과 연관시킨다.
맵리듀스의 작동방식
MapReduce도 역시 실패를 처리하는 메커니즘을 제공한다.
예를 들어서 MovieLens 데이터 세트를 살펴보자(유데미 강의에서 예시로 활용하는 데이터세트이다)
그 안에는 사용자와 영화, 1~5점 사이의 평점 그리고 100,000개의 평점 데이터가 있다.
그 데이터 세트 내의 각 사용자들이 얼마나 많은 영화를 평가했는지 세어보자.
밥은 사용자 ID 1을 가지고 있고 영화 '개미', '포레스트 검프', '덤보'를 평가했다.
샐리는 사용자 ID 2를 가졌고 '라이온 킹', '니모를 찾아서', '토이 스토리'를 평가했다.
여기서 밥은 세 편의 영화를 평가했고 샐리도 세 편의 영화를 평가했다는 것을 알아내고자 한다.
이걸 어떻게 MapReduce 문제로 만들 수 있을까?
아래를 참고하여서 실행과정을 이해해보려 한다.
1. Mapper : 입력 데이터를 적절한 양식으로 변환
매퍼는 입력 데이터의 각 줄인 영화 평점을 가져와 키-값 쌍으로 전환한다.
여기서 키는 집계하고자 하는 항목이다. 이 상황에서는 각 사용자가 시청한 영화의 수를 집계하려는 것이고 그럼 키는 사용자 ID고
값은 우리가 집계하는 영화이다.
첫 번째 줄에는 사용자 ID 196이 영화 ID242를 이 시간에 3점으로 평가했다.
이 호출에서 매퍼는 키-값 쌍 196:242를 추출한다
여기서 중요한 것은 사용자 ID 196이 242번 영화를 평가했단 거고 이런 식으로 각 입력줄을 호출한다
이 때 필요없는 정보는 매퍼가 추출하지 않는다. 니는 최적화를 위해서는 매우 중요하다.
클러스터 네트워크에 오고 가는 데이터가 적을수록 더 좋기 때문이다.
매퍼의 주된 목적은 필요한 정보를 추출하고(이 의미는 필요없는 정보는 제외하고) 적절하게 구조화하는 것이다.
매퍼 함수가 작업을 끝내고 나면 MapReduce 프레임워크는 이와 같은 키-값 쌍 목록을 클러스터 어딘가에 보관한다.
2. 키:값 쌍 데이터에 대한 Shuffle/Sort
각 고유 키에게 주어진 값들을 집계하고 키를 정렬한다.
이 데이터 세트에선 사용자 ID 166은 하나의 영화만 평가한 것을 알 수 있다.
두 번째 키, 그러니까 두 번째 사용자는 186번이고 보이는 것처럼 186번 사용자는 302, 274와 265번 세 편의 영화를 평가했다
각각의 매퍼는 주어진 일부 데이터만 처리하고 난 이후에 전체 클러스터에서 이러한 집계와 정렬 과정을 진행하는 것이다.
MapReduce의 진정한 힘은 클러스터 전체에 걸친 이 모든 값을 집계하고 한자리에 가져와 필요에 따라 처리하는 데에 있다
3. Reducer : 데이터 최종 집계
이 예에서는 'movies'라고 불리는 영화의 목록을 받았고 목록의 길이란 뜻의 'len'이라는 연산자를 호출한다.
이 데이터가 리듀서에게 넘어오면 리듀서는 각각의 고유 키를 한 번씩 호출한다
만약 여러 리듀서가 여러 컴퓨터에 걸쳐 동시에 병렬로 작업 중이라면 개별 리듀서는 주어진 양의 키를 처리해야 하고 리듀서는 각각의 키를 한 번만 호출한다
예를 들어 사용자 ID 166를 호출하면 영화 목록에 하나밖에 없으니 사용자 ID 166의 출력은 1이 되는 것이다.
사용자 ID 186은 세 편의 영화가 목록에 있으므로 영화의 수를 세면 사용자 ID 186은 3편의 영화를 본 것이다.
맵리듀스의 분산처리 방식
이전의 예시 데이터세트를 다시 살펴보며 정리해보면,
영화 평점 데이터에서 시작해 매퍼가 키-값 쌍을 추출했고 셔플과 정렬은 각 고유 키와 연관된 값을 구조화하고
리듀서는 구조화된 정보를 전달받아 최종 출력물을 생산한다
만약 정말 큰 데이터 세트를 가진 클러스터를 운영하고 있었다면 아마 처리 과정을 여러 컴퓨터에 배분하거나 적어도 여러 작업(task)에 걸쳐 진행했을 것이다.
이 예시를 조금 바꿔서 생각해 보자.
1. mapping
Hadoop과 MapReduce가 이 작업을, 세 노드에 나눠 매핑한다.
그러면 데이터를 가져오며 크게 세 부분으로 나눈다. 맨 위의 몇 줄을 한 노드에게 보내 처리하게 하고 다음 몇 줄을 다른 노드에게 마지막 몇 줄을 세 번째 노드에게 보낸다.
입력 데이터를 여러 파티션에 끼워 맞추고 각 파티션에 작업을 할당한다. 매핑 단계에서는 다른 파티션에 있는 줄을 신경쓰지 않아도 되니
작업을 병렬화하기에 수월하다.
입력 데이터 한 덩어리를 한 컴퓨터에서 매핑하고 다른 덩어리는 또 다른 컴퓨터에서 동시에 매핑하면 Hadoop은 이 작업이 끝나면 정보를 잘 받아오기면 하면 된다.
이렇게 하면 중요한 정보를 추출해 키-값 쌍으로 전환하는 매핑 처리 과정을 배분할 수 있다
2. shuffle & sort
다음은 셔플과 정렬 작업이다. 여기서 여러 머신에서 같은 키가 나올 수도 있다. 그러면 같은 키끼리 모아 이후에 리듀서로 보내야 한다.
여기 196번 사용자의 평점이 두 노드에 걸쳐있고 셔플과 정렬 작업에서 이 둘은 함께 집계될 때 MapReduce가 이 작업을 대신 수행한다.
이 모든 정보를 '합병 정렬'(merge sort) 한다. 그리고 리듀서 단계로 보낸다.
3. reducer
또 여러 리듀서로 보내진 데이터는 동시에 처리된다. 각 노드는 주어진 키 세트를 리듀싱한다.
리듀서가 두 개가 있다고 보면, 하나는 166번, 186번 사용자를 처리하고 다른 하나는 196번, 244번을 처리한다.
주어진 키에는 리듀서가 필요한 모든 정보가 담겨있어서 여러 노드에서 주어진 키 세트를 동시에 리듀싱할 수 있다
모두 마친 후에는 최종 결과가 클라이언트 노드에게 전달된다
간단히 말하자면, MapReduce는 이런 방법으로 작업을 클러스터 전체에 분배한다.
a. 매핑 단계에서는 입력 데이터를 덩어리로 쪼개 여러 컴퓨터로 보내고
b. 셔플과 정렬 작업 이후에는 여러 컴퓨터가 각각의 키 세트를 담당하여 리듀싱한다.
내부 작동 방식
1. 클라이언트 -> YARN 리소스 관리자 / HDFS
YARN은 리소스 매니저이며, Hadoop의 핵심 요소로써 어떤 머신에서 무엇을 실행할지 관리한다.
그러기 위해 클러스터의 어떤 머신이 가용하고 어떤 머신의 성능은 얼마인지 등의 정보를 기억하고 있다
클라이언트는 리소스 매니저에게 이러이러한 MapReduce 작업이 필요하다고 말한다.
그러는 동시에 필요한 데이터를 HDFS나 적절한 분산 파일 시스템에 복사한다.
이 작업을 수행할 노드들이 데이터를 액세스할 수 있도록 데이터를 미리 복사해두는 것이다.
2. Node Manager - 'MapReduce 애플리케이션 마스터' 작동
기본적으로 MapReduce가 작동하는 모든 것은 노드 관리자가 관리한다.
노드 관리자는 어떤 노드가 무얼 하고 있고 사용 가능한지, 작업 중인지 등을 관리하는 것이다.
애플리케이션 마스터는 개별 매핑과 리듀싱 작업을 주시하고 리소스 매니저와 협업해 작업을 클러스터에 걸쳐 배분한다.
예를 들어 리소스 매니저를 제외하고 클러스터에 두 개의 머신이 있다고 하자.
둘 다 매핑 작업을 하고 있고 서로 다른 컨테이너나 JVM(Java 가상 머신)이 실행된다.
하지만 같은 노드 관리자를 두고 있고 노드 관리자는 애플리케이션 마스터와 소통하며 어떤 머신이 어디서 무얼 하는지 추적한다.
두 번째 머신은 리듀싱 작업을 하고 있을 수도 있고 다른 노드 관리자가 있을 수도 있다.
이러한 매핑과 리듀싱 작업이 진행되는 동안 HDFS 클러스터와 소통하며 필요한 데이터를 받고 작업이 다 끝나면 결과 데이터를 그곳으로 출력한다.
정리하면,
1. 클라이언트 노드에서 작업을 개시하고 리소스 매니저는 다른 컴퓨터들의 가용 여부 등을 추적한다.
2. 애플리케이션 마스터는 작업 진행을 관리하고
3. 노드 관리자는 개별 PC를 관리한다.
4. 모두는 HDFS 클러스터의 데이터를 사용한다.
이 때 리소스 매니저가 매핑이나 리듀싱 작업을 최대한 데이터와 가까운 곳에서 실행한다.
예를 들어 입력 데이터의 일부를 매핑한다고 할 때 HDFS에 데이터가 입력되면 블록으로 나누고 두세 개의 노드에 걸쳐 복사해 저장하는 것이다. 그러면 리소스 매니저는 입력 데이터에 대한 매핑 작업을 웬만하면 그 데이터 블록을 가지고 있는 머신이 수행하도록 조율한다.
만약 불가능하다면 네트워크에 최대한 가까이 두려고 노력한다.
즉 불필요한 네트워크 전송을 줄이려는 것이다.
결론적으로 클러스터에 MapReduce 작업이 분배되며 하나의 물리적 컴퓨터에서 여러 작업을 실행할 수 있고 애플리케이션 마스터와 리소스 매니저가 여러 컴퓨터를 조정해 작업할 수도 있다.
Mapper와 Reducer
MapReduce는 원래 Java로 작성됐다. 사실 Hadoop이 Java로 작성된 것이다.
만약 MapReduce 애플리케이션을 손수 작성하고 싶다면 Java 프로그래밍 언어를 사용해 필요한 매퍼와 리듀서가 있는 jar 파일을 만들어야 한다.
하지만 java에 익숙하지 않은 사람이 많을 것이고 이 때 사용되는 것이 streaming process이다.
STREAMING은 Python 같이 간단한 언어로 MapReduce를 사용케 한다.
MapReduce의 STREAMING을 사용하면 표준 입출력을 사용해 매핑 작업을 시작할 수 있다.
진행 중인 프로세스와 소통하며 매퍼와 리듀서를 실행한다는 것이다.
Java에 매핑, 리듀싱 함수를 작성하는 대신 Python이나 컴퓨터 클러스터 노드에서 작동하는 프로세스에 대신 작성하고 표준 입출력을 사용해 소통한다. 표준 입력을 통해 데이터를 받아 매퍼를 실행하고 표준 출력으로 키-값 쌍을 받아낼 수 있다
결과적으로 다른 유닉스 도구들처럼 STREAMING을 통해 데이터를 처리하고 MapReduce가 원하는 결과를 생산한다.
Handling Failure
실패는 어떻게 처리할까? 이는 Hadoop과 MapReduce에서 굉장히 중요한 기능이다.
범용 PC로 구성된 거대한 클러스터의 단점은 범용 하드웨어가 종종 다운된다는 점이며 이에 대비해야 한다.
1. worker에서 에러가 발생하면? - 재시작하기
예를 들어 작업을 관리하는 애플리케이션 마스터가 어떤 작업물에서 오류를 발견하면 간단히 그 작업을 재시작할 수 있다.
가능하면 다른 컴퓨터에서 그 작업을 재시도 할 것이다.
2. application master가 다운된다면? - yarn이 재실행
애플리케이션 마스터는 사실 리소스 매니저에 의해 돌아간다.
그래서 애플리케이션 마스터가 다운되면 Hadoop의 구성요소이자 또 다른 리소스 교섭자인 YARN이 이걸 재시작한다.
리소스 매니저인 YARN이 애플리케이션 마스터를 감시하는 것이다
3. 전체 노드가 다운되거나 클러스터의 모든 PC가 다운되면?
리소스 관리자가 다시 가능한 다른 PC에서 작업을 재시작하려고 할 것이다.
각각의 컴퓨터는 데이터의 일부만을 처리하기 때문에 소실된 부분의 매핑이나 리듀싱을 새 인스턴스에서 쉽게 재시작할 수 있다.
처음부터 다시 시작할 필요가 없다.
4. 리소스 관리자가 다운되면?
이러한 경우를 위해 HA(high availability - 고가용성)을 세팅할 수 있다. zookeeper을 활용하여 stanby된 백업 리소스 매니저를 두는 것이다. 동적 예비 리소스 관리자를 유지하는 것이다.
이렇게 되면 MapReduce는 먼저 Zookeeper와 얘기해 어떤 리소스 매니저를 사용할지 알아내고 그 리소스 매니저가 다운되면 Zookeeper는 자동으로 두 번째 백업 리소스 매니저를 가리킨다.
카운터(counter) : 클러스터 전반에 걸쳐 공유된 총 수를 유지하는 기능
컴바이너(combiner) : 매퍼 노드를 줄여 최적화해 간접비를 줄인다.
사실 예전만큼 MapReduce를 많이 사용하지 않는다. 알다시피 클러스터에 SQL 스타일의 쿼리를 허용하는 Hive나 Spark 같은 고수준 도구로 대부분 대체됐다
Python - MR Job 예시
이는 Python에서 MapReduce를 작성하기 위한 방법에 대한 코드 예시이고 STREAMING 인터페이스를 다루는 복잡함을 희석시킨다.
이 패키지를 사용하기 위해서는 'mrjob' 라이브러리에서 'MRJob'과 'MRStep'을 사용한다.
'RatingsBreakdown'이 'mrjob' 패키지의 'MRJob' 클래스를 이어받는다.
그러니까 'RatingsBreakdown' 클래스가 'MRJob'에 정해진 기능을 다 사용할 수 있다
그 안에는 세 가지의 함수가 있고, 이게 하는 일은 이 작업에 어떤 함수가 매퍼와 리듀서로 사용되는지 프레임워크에 정의한다.
여길 보면 하나의 'steps'을 갖겠다는 말이고 이건 단일 MapReduce 단계를 갖겠다는 말이다.
매퍼는 'mapper_get_ratings' 함수를, 리듀서는 'reducer_count_ratings' 함수를 사용한다.