WIR(What I Read)

[하둡 완벽 가이드]Ch07. 맵리듀스 작동방법

Hazel_song 2022. 10. 23. 21:47
728x90

하둡 완벽가이드를 읽으며 정리한 내용입니다.


7.1 맵리듀스 잡 실행 상세 분석

  • 클라이언트 노드에서 맵리듀스 잡을 제출
  • YARN 리소스 매니저
    • 클러스터 상에 계산 리소스의 할당을 제어
  • YARN 노드 매니저
    • 각 머신에서 컨테이너를 시작하고 모니터링
  • MRAppMaster
    • MR잡을 수행하는 각 태스크를 제어한다.

AppMaster와 MR task는 컨테이너 내에서 실행되며 리소스 매니저는 잡을 할당하고 노드 매니저는 태스크를 관리하는 역할

  • 공유 파일 시스템
    • 서로 다른 단계 간의 잡 리소스 파일들을 공유하는데 사용된다.(보통 HDFS)

 

7.1.1 잡 제출

클라이언트 노드에서 맵리듀스 잡을 제출할 때의 과정을 상세하게 정리

  1. 내부의 JobSubmitter 인스턴스 생성
  2. 리소스 매니저에 MR 잡 ID로 사용될 새로운 애플리케이션 ID요청
  3. 잡 출력 명세 확인
    • 예를 들어 디렉터리가 지정되지 않았거나 이미 존재한다면 해당 잡은 제출되지 않고 에러
  4. 입력 스플릿 계산. 즉 실질적으로 수행 될 작업 단위를 계산한다.
  5. 잡 실행에 필요한 잡리소스에 대한 정보가 기재된 파일을 공유 파일시스템에 있는 해당 잡 ID이름의 디렉터리에 복사
    • 복제인수는 mapreduce.client.submit.file.replication 속성에 정의하며 기본값은 10으로 설정
    • 클러스터 상에 복제본이 많으면 노드 매니저가 잡의 태스크를 실행할 때 접근성이 높아지는 장점
  6. 리소스 매니저의 submitApplication()을 호출하여 잡 제출
  7. waitForCompletion() 메서드가 1초에 한번씩 잡의 진행상황을 조사하여 변경내역 보여줌.

 

7.1.2 잡 초기화(태스크 실행방식 결정)

위의 단계를 통해, 리소스 매니저가 호출을 받게되고 YARN 스케줄러에 요청을 전달한다. 

스케줄러가 컨테이너를 할당하면, 컨테이너를 시작하고 노드 매니저의 애플리케이션 마스터 프로세스를 시작한다. (그림의 5a~5b 단계)

 

애플리케이션 마스터는 잡을 초기화 할 때, 잡의 진행상태를 추적하기 위한 다수의 bookkeeping 객체를 생성한다.

그리고 각 태스크로부터 진행 및 종료 보고서를 받는다.(그림의 6단계)

 

잡 제출 당시에 복사한 입력 스플릿 리소스에 대한 정보를 공유파일 시스템으로부터 추출한다.(7단계)

  • 해당 파일 정보를 바탕으로 맵 태스크 객체를 생성하고 지정한 리듀서 수만큼 맵 태스크 객체를 생성하며, 각 태스크가 ID를 부여받게 된다.

이 때 애플리케이션 마스터는 실질적인 태스크 실행을 위해, 그 실행 방식을 고민하고 결정해야한다.

  • 잡의 크기가 작다면 본인의 JVM에서 실행한다. 즉 병렬처리를 위해 새로운 컨테이너에 태스크를 할당하는 등의 오버헤드가 발생하는 것보다 단일 노드에서의 순차처리가 더 유리하다고 판단한 것이다. (우버 태스크로 실행했다고 한다)
  • 잡의 크기가 작다는 기준은? HDFS 블록 하나보다 작은 크기의 입력, 혹은 10개 미만의 매퍼

 

7.1.3 태스크 할당

만약 잡을 우버 태스크로 실행하지 않는다면, 애플리케이션 마스터는 리소스 매니저에 추가 컨테이너를 요청한다.(8단계)

  • 이때 맵태스크 요청이 리듀스 태스크 요청보다 우선순위가 높은데, 이는 리듀스의 정렬단계가 시작되기 전에 모든 맵태스크가 완료되어야 하기 때분이다.
  • 참고로 모든 맵 태스크의 5%가 완료되기 전까지 리듀스 태스크의 요청은 처리되지 않는다(10.3.5 절의 느린 리듀스 시작 참고)

리듀스 태스크와 달리(클러스터 어느곳에서도 실행 가능) 맵 태스크는 데이터 지역성 제약이 있다. 최적의 상황은 입력 스플릿이 저장된 노드에서 맵 태스크가 실행되는 데이터 로컬일 때다.

그 다음 대안으로는 입력 스플릿과 동일한 랙이지만 노드는 다른 랙 로컬일때다.

 

그리고 이렇게 리소스를 요청할때 필요한 메모리 요구사항과 CPU 수를 명시한다.

→ 기본적으로 1,024MB 메모리와 가상코어 1개

 

7.1.4 태스크 실행

위의 과정을 통해 리소스를 태스크에 할당하면 애플리케이션 마스터는 노드 매니저와 통신하며 컨테이너를 시작한다(9a~9b 단계)

본격적으로 태스크 실행하기 전에 필요한 리소스(환경 설정, JAR파일, 분산 캐시와 관련된 파일 등)를 로컬로 가져오고(10단계),

그 다음에 맵과 리듀스 태스크를 실제 실행한다.(11단계)

  • 위의 그림을 참고하면 노드 매니저 노드 내에서 각 태스크가 실행되는 YarnChild는 자체 전용 JVM에서 실행되므로 이에 대한 버그는 노드 매니저에 영향을 주지 않는다.

앞에서 임시 작업 공간에 저장한 태스크 출력을 최종 위치로 옮긴다. (7.4.3절 출력 커미터 참고)

커밋 프로토콜은 투기적 실행이 활성화 됐을때 중복 태스크 중 단 하나만 커밋하고 나머지는 버린다(7.4.2 투기적 실행 참고)

스트리밍 태스크

사용자가 제공한 스트리밍 실행파일을 시작하고 통신하기 위한 맵과 리듀스 태스크가 실행된다.

이러한 스트리밍 태스크는 표준 입출력 스트림을 통해 프로세스와 통신한다.

 

7.1.5 진행상황과 상태갱신

맵리듀스 잡은 시간이 오래 걸릴 수 잇으므로 사용자가 진행상황을 피드백 받고 이를 판단하는 것이 중요하다.

 

맵 태스크의 경우 처리한 입력 데이터의 비율을 진행상황으로 보고 추적한다.

 

리듀스 태스크는 리듀스가 처리한 입력 데이터의 비율을 시스템이 추정하는데, 이러한 추정을 위해서 전체 진행과정을 총 세부분으로 나누게된다. 예를 들어 리듀서에서 입력의 절반을 처리했다면 태스크의 진행상황은 5/6이 된다. 이는 복사와 정렬 단계가 각각 1/3씩 완료되었고 리듀스 단계의 절반(1/6)이 진행되엇기 때문이다.(셔플의 단계와 관련)

-> 즉 리듀스 태스크는 copy, sort, reduce  세 단계로 태스크를 나누고 이 기준으로 진행 상황을 추적한다.

 

💡 진행 중임을 판단하는 동작
- 입력 레코드 읽기
- 출력 레코드 쓰기
- 상태 명세의 설정
- 카운터 증가

태스크는 프레임워크 같은 것을 통해, 수행 중인 여러 이벤트를 세는 카운터를 실행하고, 본인이 실행 중이라는 상태를 애플리케이션 마스터에 보고한다.

 

7.1.6 잡 완료

애플리케이션 마스터가 마지막 태스크 완료를 통지 받으면 잡의 상태를 성공으로 변경하고 종료

 

7.2 실패

하둡에서 맵리듀스 잡의 실패에 대한 대응 정책

 

7.2.1 태스크 실패

태스크가 아예 장애로 작업을 실행하지 못하는 경우는 애플리케이션 마스터에서 리소스를 다른 태스크에서 실행하게 된다.

 

행이 걸린 즉 멈춘 태스크의 경우, 바로 실패로 처리되는게 아니라 특정 시간 동안 진행상황을 갱신 받지 못한 경우 애플리케이션 마스터에서 실패로 표시한다.

→ 이 특정 시간 즉 타임아웃은 사용자가 설정 가능한데 0으로 설정하면 아예 비활성화가 된다.

따라서 아무리 긴 시간동안 실행되지 않아도 멈추지 않을 것이고 이는 클러스터를 느리게 만든다.

 

또한 태스크가 실패하더라도 당장 잡이 중단되는 것은 원치않을 수 있으므로 잡이 실패하지 않는 태스크 실패 허용 비율을 잡에 설정할 수 있다. 이 때 태스크의 강제 종료는 실패에 포함하지 않는다.

 

7.2.2 애플리케이션 마스터 실패

애플리케이션 마스터가 실패하면 재시도를 하게 되는데 이 또한 사용자가 설정할 수 있다.

설정한 횟수만큼 시도한 다음에도 복구되지 않으면 잡은 실패로 끝난다.

 

그렇다면 재시도 과정에서 복구 작업 방식은 어떨까?

애플리케이션 마스터는 주기적으로 리소스 매니저에 하트비트를 보내고, 실패시에 리소스 매니저가 이를 감지해서 새로운 컨테이너에서 새로운 마스터 인스턴스를 시작한다.

이때 잡 히스토리를 사용해서 실패한 애플리케이션에서 이미 실행된 모든 태스크의 상태를 복구하므로 재실행할 필요가 없다.

 

만약 클라이언트가 진행상황 보고를 위해 폴링하고 있는 애플리케이션 마스터가 실패한다면 클라이언트는 새로운 애플리케이션 마스터 인스턴스의 위치를 알아내야 한다.

이를 위해 리소스 매니저에 애플리케이션 마스터의 주소를 요청하고, 이를 캐시하여 이후에는 폴링할때마다 리소스 매니저에 요청하지 않도록 한다.

 

7.2.3 노드매니저 실패

리소스 매니저에서 특정 노드매니저가 (설정 가능한, 기본적으로 10분)특정 시간동안 하트비트를 전송하지 않음을 인지하면, 컨테이너를 스케줄링하는 노드 풀에서 해당 노드 매니저를 제거한다.

 

만약 실패한 노드 매니저에서 맵태스크가 실행됐다면, 애플리케이션 마스터는 이 태스크를 재 실행한다.

-> 즉 실패한 노드 매니저 위에 돌아가는 태스크 포함 애플리케이션 마스터 모두 recover한다.

 

애플리케이션 실패 횟수가 높으면 노드 매니저에 문제가 없어도 노드 매니저가 블랙리스트에 등록된다.

 

7.2.4 리소스 매니저 실패

우선 “굉장히” 심각한 상황! 왜냐면 리소스 매니저 없이 잡이나 태스크 컨테이너가 실행될 수 없기 때문이다.

따라서 고가용성(HA)을 위해 설정이 필요하다.

 

이를 위해 두 개의 리소스 매니저를 활성대기 설정으로 실행한다.

실행중인 애플리케이션에 대한 정보는 고가용 상태 저장소(주키퍼 혹은 HDFS)에 보관되기 때문에 대기 리소스 매니저는 실패한 리소스 매니저의 핵심 상태를 복구할 수 있다.

 

노드 매니저 정보는 상태저장소에 보관되지 않는데 이는 노드매니저가 새로운 리소스 매니저에 첫번째 하트비트를 보낼때 새로운 리소스 매니저가 이를 바탕으로 상대적으로 빠르게 재구축할 수 있기 때문이다.

 

리소스 매니저가 대기 → 활성상태로 전환하는 것은 장애극복 관리자가 담당한다.(failover controller)

장애 극복 관리자는 주키퍼 대표자 선출을 사용해서 어떤 시점에라도 단일 활성 리소스 매니저가 존재하도록 보장한다.

→ 이러한 장애극복 관리자는 독립 프로세스일 필요는 없으므로 기본적으로 리소스 매니저에 포함된다.

 

노드 매니저의 입장에서는 새로운 리소스 매니저를 찾을때까지 라운드 로빈 방식으로 각 리소스 매니저에 연결을 시도한다.

-> 노드 매니저가 각 리소스 매니저에 일정 시간 동안 계속해서 연결을 시도하면서 연결되는 리소스 매니저를 찾는다??

 

7.3 셔플과 정렬

  • 셔플 : 정렬을 수행하고 맵의 출력을 리듀서의 입력으로 전송하는 과정

 

7.3.1 맵 부분

맵 함수가 결과를 생산할 때 실질적으로는 단순히 디스크에 쓰는 것은 아니다. 효율적인 처리를 위해 메모리에 일정 크기만큼 쓴다음 사전 정렬을 수행한다.

메모리 버퍼는 환형 구조이며, 기본적으로 100MB인데 이 또한 변경 가능하다.

버퍼의 내용이 특정 한계치(기본적으로 80%, 이 또한 설정 가능)에 도달하면 백그라운드 스레드가 디스크에 스필(각 파티션별 메모리 버퍼를 디스크에 한번에 쓰는 것)한다. 이 때 데이터를 최종적으로 전송할 리듀서 수에 맞게 파티션한다.

-> 해당 파티션 기준으로 후에 merge 된다.

이 과정에서도 맵 결과는 계속 버퍼에 쓰이는데 버퍼가 가득차면 맵은 스필이 종료될때까지 블록 된다.

 

스필 한계치에 도달하면 새로운 스필파일이 생성된다. 따라서 맵 태스크가 최종결과 레코드를 쓰고나면 여러개의 스필 파일이 존재할 수 있다. 태스크가 종료되기전에 이러한 여러 스필 파일은 단일 출력 파일로 병합되고 정렬된다.

→ 한 번에 병합할 최대 스트림 수를 조절할 수 있으며 기본 값은 10이다.

 

맵 출력 파일을 효율화 하기 위해 컴바이너가 실행되는데, 이 기준은 스필 파일이 세개를 넘느냐 아니냐이다.

즉 스필파일이 세개를 넘으면 출력을 축소하기 위해 컴바이너가 실행된다.

그러나 한개나 두개일 경우에는 컴바이너를 호출하는거 자체가 오버헤드이므로 실행하지 않는다.

 

이렇게 맵출력을 디스크에 쓰려는 시점에 압축을 실행한다. 이를 통해 디스크 공간을 절약하며 리듀서로 전송할 데이터양을 줄일 수 있다.

→ 기본적으로는 출력 당시에 압축이 실행되지 않지만, 압축하도록 설정할 수 있으며 이때 압축 라이브러리 또한 지정할 수 있다.

맵태스크를 진행하며 파티션을 나누었던 파티션 정보는 HTTP를 통해 리듀서에 전달된다.

 

7.3.2 리듀스 부분

리듀스 테스크는 클러스터 내에 퍼져있는 여러 맵 태스크로부터 특정 파티션에 해당하는 맵 출력들의 정보를 한번에 알아야 한다.

실제로 각기 다른 시간에 끝나는 맵태스크에 대해, 리듀스 태스크는 각 맵 태스크의 출력이 끝나는 즉시 각각 병렬로 복사를 실행한다.

💡 맵 태스크가 성공적으로 종료되면 하트비트 전송 메커니즘을 통해 애플리케이션 마스터에 맵출력의 서버를 알려준다.
그렇다면 애플리케이션 마스터는 맵출력과 호스트 사이의 매핑 정보를 알고 있게 된다.
이에 대해 리듀스 내의 한 스레드가 맵 출력 호스트 정보를 주기적으로 마스터에 요청한다.

이때 호스트는 첫 번째 리듀서가 맵 출력을 회수하더라도 디스크에서 그들을 삭제하지 않는데. 이는 리듀스가 회수에 실패할 수 있기 때문이다. 애플리케이션 마스터로부터 삭제 요청을 받을 때까지 기다린다.

 

이 때 맵출력의 크기가 작다면 리듀스 태스크 JVM메모리에 복사된다.

이 또한 맵태스크처럼 인메모리 버퍼가 한계치에 다르면 병합되어 디스크에 스필된다.

디스크에 축적되면 백그라운드 스레드가 이를 정렬된 형태의 파일로 병합한다. 만약 압축된 맵 출력이라면 병합을 위해 메모리 내에서 압축을 풀어야 한다.

 

맵 출력이 복사되는 시점에 리듀스 태스크는 정렬 혹은 병합을 실행한다.

→ 이 작업은 라운드 단위로 이루어 지는데, 가령 맵출력이 50개이고 병합 계수가 10이라면 5개의 라운드로 구성된다. 이를 통해 5개의 중간 파일이 생성된다.

 

이렇게 만들어진 파일들은 하나의 정렬된 파일로 병합하는 최종 라운드를 가지는 대신 마지막 단계(리듀스 단계)의 리듀스 함수에 곧바로 전송하여 디스크 IO를 줄인다. 

-> 이 부분이 이해가 잘 안된다

 

❓ 사실 각 라운드에서 병합되어 만들어지느 파일 수는 좀 더 미묘하다. 궁극적인 목적은 최종 라운드에서 병합 계수에 도달하기 위한 최소한의 파일을 병합하는 것이다. 따라서 예시처럼 40개의 파일일때 10개씩 병합하여 파일 4개를 얻는 것과는 실제는 다른다.

처음에는 4개만 병합하고, 이어서 라운드 3개에서 10개씩 병합하면, 네개의 병합된 파일과, 병합되지 않은 여섯개의 파일을 합쳐서 10개의 파일이 생성된다.

 

7.3.3 설정 조정

맵리듀스 성능향상을 위해 셔플 튜닝에 대한 설정을 해줄 수 있는데 일반적으로는 셔플에 가능한 한 많은 메모리를 할당하는 것이 성능향상을 위해 좋다.

그러나 맵과 리듀스 함수가 동작하는데에도 충분한 메모리 확보가 필요하므로 해당 함수들이 동작할 때 무한정으로 메모리를 사용하지 않도록 해야 셔플에 메모리를 가능한 많이 할당할 수 있다.

 

맵 측면에서는 다수의 디스크 스필을 피하는 것이 최고의 성능을 내는 방법이다.

리듀스 측면에서는 중간 데이터 전체가 메모리에 존재할 때 최고의 성능을 얻을 수 있다. 일반적으로는 모든 메모리를 리듀스 함수에 예약해두므로 중간 데이터를 보관할 메모리를 할당하는 것이 힘드나, 리듀스 함수가 조금의 메모리만 요구할때 이러한 설정이 가능하여 성능향상을 꾀할 수 있다.

-> 이 부분이 이해가 잘 되지 않는다. 중간 데이터는 어떤 데이터이며, 맵태스크를 완료한 맵출력 데이터인건가? 그리고 데이터가 메모리에 존재하는데 어느 메모리에? 리듀스 내 JVM 메모리? 

-> 위에서 언급된, "맵출력의 크기가 작다면 리듀스 태스크 JVM메모리에 복사된다. 이 또한 맵태스크처럼인메모리 버퍼가 한계치에 다르면 병합되어 디스크에 스필" 이 부분과 관련된건가? 즉 디스크에 스필 되지 않고 최대한 많은 데이터가 메모리에 잔존하는 것이 최고의 성능을 얻는 방법이라는 건가?

 

7.4 태스크 실행

해당 절에서는 태스크 실행에 관한 제어 사항에 대해 정리

주요 개념은 투기적 실행과 출력 커미터

 

7.4.1 태스크 실행 환경

하둡은 맵, 리듀스 태스크에 실행환경에 관한 정보를 전달해주며, 이 정보에서 처리할 파일명이나 태스크 시도 횟수등을 알 수 있다.

7.4.2 투기적 실행

맵리듀스 모델은 잡을 태스크로 나누고 태스크를 병렬 수행하는 형태다.

 

이 때 하나의 느린 태스크가 있는 경우, 전체 잡 수행을 상당히 지연시킬 수 있다.

이때 하둡은 느린 태스크를 진단하거나 고치려하지 않고, 태스크 수행이 예상했던 것보다 더 느리게 수행된 상황을 감지하여 비슷한 설정을 지닌 동일한 태스크를 새로 실행한다. 이를 태스크의 투기적 실행이라고 한다.

→ 즉 이름때문에 두개의 복제 태스크를 경쟁하도록 하는 실행이 아니다.

 

즉 복제 태스크가 원본 태스크와 함께 같이 실행되고, 원본이든 복제든 하나의 태스크가 성공적으러 완료되면 실행 중인 중복 태스크는 더이상 필요없으므로 강제 종료된다.

투기적 실행은 안정적인 잡 실행이 아닌 일종의 최적화가 목적이다.

중복된 데이터를 복사하여 실행함으로써 더 많은 리소스를 사용하며, 결국 성공한 하나의 태스크가 아니면 버려지지만, 동일한 태스크를 여러 노드에서 실행함으로써 특정 노드가 느리더라도(장비노후나 특정 문제로) 다른 노드에서 먼저 끝나면 해당 결과를 사용할 수 있게 된다. 즉 특정 노드에서 발생할 수 있는 버그를 제거함으로써 전체적으로는 수행 시간이 단축 되는 것이다.

 

즉 어떠한 태스크가 버그로 인해 느리다고 이를 피하기 위해 복제 태스크를 실행하고 이를 의존하는 것은 옳지 못하다. 복제 태스크 또한 원본과 동일한 버그에 영향을 받을 가능성이 있기 때문이다.

 

투기적 실행의 궁극적인 목적은 잡 실행시간을 줄이는 최적화지만, 사실 클러스터 효율성 측면에서는 복제 태스크가 추가로 실행되는 것이므로 비용이 발생한다.

 

리듀스 태스크에서는 투기적 실행을 끄는 것이 좋은 예다. 복제 리듀스 태스크를 실행하려면 원본과 동일한 맵출력을 인출해야하며 이는 클러스터에서 네트워크 트래픽을 심각하게 증가시킨다.

 

7.4.3 출력 커미터

잡과 태스크가 깨끗하게 성공하거나 실패하도록 보장하기 위한 커밋 프로토콜을 사용한다.

이 동작은 OutputCommitter로 구현했다.

  • 잡이 실행되면 최종 출력 디렉터리와 태스크 출력을 위한 임시 작업공간을 최종 출력 디렉터리의 서브 디렉터리로 생성한다.
  • 그리고 이 잡이 성공하면 임시 작업 공간을 삭제하고 파일시스템 클라이언트에 잡이 성공적으로 완료됐음을 알린다.
  • 실패한 경우도 비슷한데 이때는 실패를 알린다.

특정 태스크에 대한 여러 태스크 시도 중에서 단 하나만을 커밋하며 나머지는 중단시킨다. 만약 두개의 태스크 시도가 투기적 중첩으로 인해 동시에 실행되면 먼저 완료된 태스크가 커밋되고 나머지는 중단된다.

각 작업에 대한 결과를 커밋하여 최종적으로 성공인지 실패인지 명확하게 전달할 수 있게 하는 것이다.

 

태스크의 부차적인 파일

태스크의 출력을 작성하는 일반적인 방법은 키-값 쌍을 수집하는 OutputCollector를 사용하는 것이다.

몇몇은 키-값 쌍 모델보다 더 유연한 모델을 필요로 하기 때문에 직접 HDFS와 같은 분산 파일 시스템에 출력 파일을 작성한다.

 

이때 동일 태스크의 다중 인스턴스가 동일한 파일에 쓰이지 않아야 하는데, OutputCommitter 프로토콜이 이를 해결해준다.

애플리케이션이 태스크 작업 디렉터리에 부차적인 파일을 작송하고 성공한 것은 출력 디렉터리에 옮겨지고 실패한 태스크는 삭제될 것이기 때문이다.

 

 

Reference

728x90