7. ETL/Airflow 소개
Spark/Athena 사용 시나리오
Spark나 Athena를 쓰는 경우는 대부분 데이터가 비구조화 데이터인 경우 or 데이터 사이즈가 큰 경우 (ex. 사용자 이벤트 데이터)
데이터 웨어하우스의 스토리지는 비쌈 → 값이 싼 스토리지에 먼저 저장 (데이터 레이크)
데이터 레이크 : 스토리지 가격이 싸서 비구조화 데이터를 저장하기 좋음
데이터 웨어하우스 : 스토리지 가격이 비싸 구조화된 데이터 위주로 저장
Athena : SQL로 데이터들을 처리
Spark : Pandas의 데이터 프레임처럼 처리할 수도 있고 SQL 형태로 처리할 수도 있음
Athena/Spark -> 큰 데이터를 Adhoc하게 사용할 수 있게함
ML Model에 필요한 input을 동적으로 계산할 수도 있지만 너무 느려짐
input feature가 실시간으로 계산하기 어려운 경우 다른 스토리지에 저장해 놓고 시간 마다 업데이트
데이터 파이프라인이란?
ETL
- 데이터 소스에서 데이터를 추출해서 우리가 원하는 포맷으로 변환 후 데이터 웨어하우스에 적재하는 것
- 외부에 있는 데이터를 변환하는 것
- 내가 데이터가 없어 새로운 데이터를 읽어야 할 경우
ELT
- Summary table을 만드는 과정
- 이미 데이터가 시스템 안에 들어와 있고 이 테이블을 JOIN하여 새로운 테이블을 만드는 것
- 소스가 데이터 웨어하우스 일수도 있고 데이터 레이크 일수도 있음
- 이미 데이터 레이크, 데이터 웨어하우스에 있는 데이터를 Summary해서 원하는 정보를 만들어 다시 데이터 웨어하우스에 적재하거나 데이터 마트에 저장
데이터 레이크
- 크고 싸게 데이터를 저장, 비구조화, Scale
데이터 웨어하우스
- 기본적으로 SQL 엔진, 모든 데이터가 테이블로 존재
데이터 파이프라인의 정의
데이터를 소스로부터 목적지로 복사하는 작업
좁은 의미 : ETL, 외부 데이터를 읽어서 데이터 웨어하우스에 적재하는 것
넓은 의미 : 데이터 소스가 데이터 웨어하우스 자체가 되고 목적지도 데이터 웨어하우스가 됨. 소스가 데이터 웨어하우스이고 목적지가 프로덕션 DB나 외부 시스템일 수 있음
데이터 파이프라인 예시1
Raw Data ETL Jobs
1. 외부와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통하게 됨)
2. 적당한 데이터 포맷 변환 후 (데이터의 크기가 커지면 Spark 등이 필요해짐)
3. 데이터 웨어하우스 로드
이 작업은 보통 데이터 엔지니어가 함
데이터 파이프라인 예시2
Summary/Report Jobs
1. DW(혹은 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL
2. Raw Data를 읽어서 일종의 리포트 형태나 써머리 형태의 테이블을 다시 만드는 용도
3. 특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
요약 테이블의 경우 SQL (CTAS를 통해)만으로 만들고 이는 데이터 분석가가 하는 것이 맞음. 데이터 엔지니어 관점에서는 어떻게 데이터 분석가들이 편하게 할 수 있는 환경을 만들어 주느냐가 관건
데이터 파이프라인 예시3
Production Data Jobs
1. DW로부터 데이터를 읽어 다른 스토리지(많은 경우 프로덕션 환경)로 쓰는 ETL
a. 써머리 정보가 프로덕션 환경에서 성능 이유로 필요한 경우
b. 혹은 머신러닝 모델에서 필요한 피쳐들을 미리 계산해두는 경우
2. 이 경우 흔한 타겟 스토리지
a. 카산드라/HBase/DynamoDB와 같은 NoSQL
b. MySQL과 같은 관계형 데이터베이스(OLTP)
c. Redis/Memcache와 같은 캐시
d. ElasticSearch와 같은 검색엔진
ex. 인기강의 -> 실시간으로 정확할 필요는 없음. DW에서 매시간마다 요약해서 뿌려주기
Airflow 소개
Airflow
- 파이썬으로 만들어진 데이터 파이프라인 플랫폼
- 에어플로우에서 데이터 파이프라인은 DAG 라고 함
- DAG들은 task들로 이루어져 있음
- 에어플로우도 클러스터임 (한 대에서 돌릴 수도 있고 여러 대에서 돌릴 수도 있음)
5개의 컴포넌트로 구성
1. Web Server - Python Flask로 구현됨
2. Scheduler - DAG마다 언제 실행되는지. 실행 스케줄
3. Worker - DAG에 속한 task들을 실행시켜 주는 것 → 서버수가 늘어나면 worker가 늘어나는 것. 더 많은 DAG를 실행할 수 있음
4. Database - SQLite가 기본으로 설치됨
5. Queue - 멀티노드 구성인 경우에만 사용됨. 다수의 서버에 DAG를 분산해서 실행. 이 경우 Executer가 달라짐(CeleryExecuter, KubernatesExecuter)
웹서버와 스케줄러는 마스터 노드에 있고 worker의 수만 늘어남
worker와 마스터 노드가 같은 곳에 있는게 아니라 별개의 서버로 존재
스케줄러가 태스크를 스케줄할때 바로바로 worker에 나눠주는게 아니라 queue에 넣으면 worker들이 읽어가는 형태
DAG
데이터 파이프라인이 해야하는 일들을 task로 분리를 한 다음에 task들의 실행순서를 정함
Extract, Transform, Load 각각을 task로 놓고 dependency를 걸어둠
Task : Operator의 인스턴스. 범용적인 Operator라면 코딩할 일이 별로 없음
Airflow의 장점과 단점
장점
- 다양한 형태로 실행 순서를 정의할 수 있음
- 다양한 operator들을 통해 개발이 쉬움
- Backfill이 쉬움
- 이미 읽어온 데이터가 있는데, 그 데이터가 소스 쪽에서 바껴서 다시 읽어와야 하는 경우
- 데이터를 복사하고 과거 데이터들을 읽어오는 것
- 코드 변경없이 현재부터 미래까지 운영할 수 있고 과거에 문제가 생겼을 때 Airflow UI에서 해결 가능
단점
- 러닝 커브가 높음
- deploy나 test에 대해 잘 확립된 프로세스들이 있지는 않음
- 멀티 노드로 가는 순간 유지보수가 어려워짐
데이터 파이프라인을 만들 때 고려할 점
Best Practice 1
- 가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블을 만들기 (Full Refresh)
- Incremental Update만이 가능하다면, 대상 데이터 소스가 갖춰야 할 몇 가지 조건이 있음
- 데이터 소스가 프로덕션 DB 테이블이라면 다음 필드가 필요 → created, modified, deleted
- 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야 함
Best Practice 2
- 멱등성을 보장하는 것이 중요
- 동일한 입력 데이터로 데이터 파이프라인을 여러번 실행해도 최종 테이블의 내용이 달라지지 않아야 함
- 중복 데이터가 생기지 않아야 함
Best Practice 3
- 에어플로우는 Backfill에 강점을 가지고 있음
- DAG의 cacheup 파라미터가 True가 되어야 하고 start_date와 end_date가 적절하게 설정되어야 함
- Backfill은 대상 테이블이 incremental update인 경우에만 의미가 있음
- execution_date 파라미터를 사용해서 업데이트되는 날짜 혹은 시간을 알아내게 코드를 작성해야 함(현재 시간을 기준으로 업데이트 대상을 선택하는 것은 안티패턴)
Best Practice 4
- 데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
- 주기적으로 쓸모없는 데이터들을 삭제
Best Practice 5
- 데이터 파이프라인 사고시 마다 사고 리포트 쓰기
- 중요 데이터 파이프라인의 입력과 출력 체크하기
Airflow의 Backfill 방식
start_date : DAG가 incremental update를 한다는 가정하에서 읽어와야하는 데이터의 날짜
Airflow의 접근 방식
- 모든 DAG의 실행에는 execution_date가 지정되어 있음
- execution_date로 채워야하는 날짜와 시간이 넘어옴
- 어느 날짜의 데이터를 읽어와야 할지 현재 시간을 보고 결정하지 않고 execution_date를 봐야함(현재 시간 기준으로 만들면 Backfill이 안됨)
'Data Enginerring > 데이터 엔지니어링 스타터 키트' 카테고리의 다른 글
[6주차] Airflow 심화학습 (1) | 2022.09.21 |
---|---|
[5주차] Airflow 소개 (1) | 2022.09.20 |
[3주차] 데이터 엔지니어링을 위한 SQL (0) | 2022.09.12 |
[2주차] Cloud & AWS 그리고 Redshift (0) | 2022.09.08 |
[1주차] 데이터 조직이란 무엇일까? (0) | 2022.09.08 |