프로그래머스[데이터 엔지니어링 스타터 키트]필기 : ETL&Airflow
데이터 파이프라인이란?
Data Warehouse VS Data Lake
DW에 모든 데이터를 정말로 저장하진 않는다(비구조화된 데이터는 저장할 수 없다 / 비용문제)
-> 최근 1~2년 데이터 중에서 구조화된 데이터만 저장. DL에는 모든 데이터를 저장(대표적으로 S3)
-> DW 앞단에서 모든 데이터를 저장하고, 이 중에서 선별해서 DW에 저장.
->source에서 로그 태깅을 한걸 기준으로 파티셔닝해서 저장
->메타 데이터들을 spark로 데이터 프레임 이름을 주고, 네이밍에 따라서 적재한다.
-> 보통 하둡/하이브 쓸때 external table 만들어서 사용
-> 일반적인 분석용 데이터는 DW에서
-> DS에서 DL로 갈때 FluentD, LogStash, Kafka,airflow 해당 툴 중에 하나를 사용
분리하면 좋은 점은?
DL에는 다양한 종류의 raw data가 시간 제약 없이 존재한다. 따라서 이를 기반으로 머신러닝 모델링도 가능해진다.
웹서버 로그 같은 것도 저장할 수 있다. DW에서는 선별된 데이터로 분석용으로 많이 활용된다.
-> 좀더 회사 규모가 커지면 AB테스트에서 데이터 업무를 많이 하게 된다. 실제 프로덕션 데이터 업무도 하게 된다.
-> 트래픽이 엄청 늘어서 실제 데이터 베이스에도 무리가 오는 경우,
-> DW를 실제 프로덕션 환경에서 사용하게 될수도 있다.(성능 문제 때문에)
-> 실시간이 아니어도 되는 경우, DW에서 주기적으로 copy가 되고 있기 때문에 사용할수 있다.
-> Data Lake의 데이터를 데이터 분석용으로 바로 활용하기에는 문제가 있고, transform하는 과정이 필요하다. 보통 이곳에 쌓이는 raw data는 ML 업무 등에 활용된다.
Data Pipeline에 대해
테이블에 중복된 데이터를 넣지 않기 위해서는 삭제->새로 생성하는 방법을 써야한다.
그러나 이런 경우 삭제 후에 생성 과정에서 에러가 발생한 경우 문제가 생긴다.
따라서 transaction 처리가 필요하다. 즉 삭제-> 생성이 완전히 완료되지 않으면 삭제 조차도 처리되지 않도록
autocommit = True를 포함한다.
sql 안에 쿼리 앞과 뒤에 begin/end를 넣어주는게 transaction이다.
Airlow
-> 자체는 스케줄러고, 특정 툴을 실행시키기도 한다.
-> bash operator로 특정 프로그램과 연결 시켜서 airflow 내에서 실행해서 스케줄러를 관리하고 실행결과를 지켜보기 용이하다.
-> spark job을 airflow내에서 실행하는 경우, spark job을 단순히 실행하는 용도로 사용되기도 한다.
-> 보통 airflow 에 대한 데이터도 저장되어야하므로 데이터 베이스를 같이 설치한다.
-> ETL 관리, 실패에 대한 모니터링 및 재실행 용이
-> 자체적인 서버와 db를 가진다.
-> 데이터 양이 많아지면 airflow 서버를 scale up/out을 해야한다.
->프로덕션 데이터베이스는 scale up이 일반적이지만 airflow는 scale out(기본적으로 worker server 추가)이다.
단점
초반 러닝커브가 크다. cluster모드 즉 서버가 여러개가 되면 어렵다.
이럴때는 그냥 클라우드에 잇는 airflow를 쓰는게 좋다.(구글 클라우드의 GCP)
-> AWS 것보다 훨씬 낫다.
DAG란?
airflow 모듈 중 하나이다. 이는 일종의 데이터 파이프라인이다.
하나 이상의 task로 구성되어 있다. 일종의 job trigger(glue에 빗대면)
실제로 workflow의 개념이라고 보면된다.
-> "dag_v1" 이 ui에도 보여주는 해당 DAG의 이름이다.
-> schedule_interval 은 주기이다.
-> start_date는 태스크가 실행되는 시작 날짜가 아니라, 어느 시점의 데이터 부터 입력할 것인지이다. 즉 11월 7일이라고 입력하면 11/8 부터 실행되고 11/7 데이터부터 입력된다.
-> t1 >> t2 : t1 태스크 이후에 t2태스크를 실행해라
-> set_upstream 으로도 표현 가능하다
-> 그래프 구조/방향이다. 따라서 순환 구조를 지원하지 않는다.
-> start/end같은 dumyoperator는 각 테스크의 정확한 시작과 끝을 나누고 싶을때 사용. 이는 실제로 아무 일도 일어나지 않는다.
->bash 보단 pythonoperator를 많이 사용한다.
데이터 파이프라인 주의사항
* 데이터가 작으면 통ㅈ채로 복사하고 새로 만들어주는 것이 좋다. 데이터가 수정되는 경우, 필드 내의 created/modified/deleted 필드가 필수이며 이를 바탕으로 업데이트 되어야 한다. -> 해당 필드를 기준으로 update / incremental update가 매우 복잡하다
* 중복데이터가 발생하지 않아야 한다.
* incremental update를 하게 되면 실패한 데이터 파이프라인 재실행 즉 backfill이 복잡해 진다.
* 사고마다 리포트 쓰기
* 중요 데이터 파이프라인의 입출력 체크하기
* 주기적으로 쓸모없는 데이터를 삭제(DW에서 안쓰는 데이터도 다시 DL로 이동)
start_date/execution_date(start_date + frequency)
-> catch_up = True : start_date 부터 다 실행한다. false면 과거 실행한 데이터는 실행하지 않는다. (catch_up은 false 인게 안전)
-> 따라서 airflow의 start_datetime은 DAG의 실행 시작 날짜가 아니라 읽어오기 시작할 데이터의 날짜이다.