Airflow 란?
- 에어비앤비에서 개발한 워크플로우 스케줄링, 모니터링 플랫폼
- 빅데이터는 수집, 정제, 적제, 분석 과정을 거치면서 여러가지 단계를 거치게 되는데 이 작업들을 관리하기 위한 도구
- 2019.09 현재 1.10.5 버전이 최신이며 아파치의 탑레벨 프로젝트로 등록^1
특징
- Dynamic
- 에어플로우 파이프라인(동작순서, 방식)을 파이썬 코드를 이용하여 구성하기 때문에 동적인 구성이 가능
- Extensible
- 파이썬을 이용하여 오퍼레이터, 익스큐터를 사용자의 환경에 맞게 확장하여 구성하는 것이 가능함
- Elegant
- 에어플로우 파이프라인은 간결하고 명시적이며, 진자 템플릿(jinja template)을 이용하여 파라미터화 된 데이터를 전달하고 자동으로 파이프라인을 생성하는 것이 가능
- Scalable
- 분산구조와 메시지큐를 이용하여 많은 수의 워커간의 협업을 지원하고, 스케일 아웃이 가능함
에어플로우 파이프라인이 파이썬을 이용하여 구성되었기 때문에 간결하고 다이나믹하게 변경할 수 있고, 분산 구조를 가지고 있어서 확장성이 높음
Ariflow 설치
에어플로우는 파이썬으로 구현되어 있기 때문에 기본적으로 파이썬과 pip
가 설치되어 있어야 합니다. 설치 과정^2은 다음과 같습니다.
# 에어플로우 HOME 위치 설정
# 지정하지 않았을 때 아래의 기본 위치에 설치
export AIRFLOW_HOME=~/airflow # 기본 설정 위치
# apache-airflow를 설치
pip install apache-airflow
# 데이터베이스 초기화
# airflow.cfg에 데이터베이스 설정을 하지 않으면 sqlite를 이용
airflow initdb
에어플로우를 설치하고, 데이터베이스를 초기화 하는 것까지 진행하면 에어플로우를 실행할 수 있습니다.
설정 정보
에어플로우 관련 설정은 airflow.cfg
파일을 수정하여 진행할 수 있습니다. 실제 운영 서버에서 작업을 위해서는 Celery Executor
^3나 Dask Executor
^4를 이용해야 합니다. 위와 같이 설치하는 방법은 테스트 목적으로만 이용하는 것이 좋습니다.
Airflow 실행
에어플로우 설치후 웹서버(webserver)와 스케줄러(scheduler)를 실행해야 합니다.
# 8080 포트로 웹서버 실행, 포트 변경 가능
airflow webserver -p 8080
# 스케줄러 실행
airflow scheduler
# 웹서버 실행후 다음으로 접속
http://localhost:8080
웹서버 실행후 지정한 포트로 접속하면 DAG 뷰를 확인할 수 있습니다. DAG의 스케줄러를 On
으로 변경하면 스케줄러가 실행됩니다.
에어플로우는 기본적인 예제가 포함되어 있습니다. 이를 없애기 위해서는
airflow.cfg
파일의load_examples
설정을 False로 변경해야 합니다.
Airflow 예제
에어플로우 DAG는 python
으로 작성하여, AIRFLOW_HOME
아래 dags
폴더에 넣으면 자동으로 인식합니다. 다음은 배쉬 오퍼레이터 t1에 t2, t3 오퍼레이터가 병렬로 동작하는 예제입니다.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
# 하루에 한번 실행되는 DAG
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# BASH 오퍼레이터
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
작성한 파일을 아래와 같이 dags
폴더에 생성하면 됩니다.
$ ll
total 228
-rw-r--r-- 1 bpsec users 30365 Sep 23 07:33 airflow.cfg
-rw-r--r-- 1 bpsec users 177152 Sep 24 04:46 airflow.db
-rw-r--r-- 1 bpsec users 6 Sep 23 08:34 airflow-webserver.pid
drwxr-xr-x 3 bpsec users 4096 Sep 23 07:45 dags
drwxr-xr-x 8 bpsec users 4096 Sep 23 07:46 logs
-rw-r--r-- 1 bpsec users 2462 Sep 10 06:52 unittests.cfg
$ ls -al dags/
total 20
drwxr-xr-x 3 bpsec users 4096 Sep 23 07:45 .
drwxr-xr-x 4 bpsec users 4096 Sep 24 04:46 ..
-rw-r--r-- 1 bpsec users 1219 Sep 23 07:45 bash_sample.py
-rw-r--r-- 1 bpsec users 809 Sep 19 10:39 hive_sample.py
drwxr-xr-x 2 bpsec users 4096 Sep 23 07:45 __pycache__
커넥션 설정
예제로 제공하는 BashOperator
외에도 mysql, hive, pig 등 여러가지 오퍼레이터^5를 제공합니다. 이 오퍼레이터를 이용하기 위해서는 연결정보인 커넥션을 설정해야 합니다.
에어플로우의 장점
- 강력한 웹 UI
- 트리, 그래프, 변수, 간트 차트등 다양한 뷰 제공
- DAG의 실행, 로그, 변수 등 여러가지 정보 확인
- 다양한 커넥션 제공
- hive, pig, mysql, mssql 등
- 활발한 커뮤니티
- GitHub 홈페이지^6에서 빠르게 수정 및 업데이트가 이루어지고 있음
참고
- Celery - http://www.celeryproject.org/
'빅데이터 > airflow' 카테고리의 다른 글
[airflow] PythonOperator에서 TypeError: function() got an unexpected keyword argument 'conf' 오류 (0) | 2021.11.25 |
---|---|
[airflow] BashOperator에서 jinja2.exceptions.TemplateNotFound: 오류 (1) | 2021.11.18 |
[airflow] 에어플로우 설치 (0) | 2021.10.14 |
[airflow] 에어플로우 RestApi 설정 (0) | 2021.10.06 |
[airflow] 에어플로우 컨셉(airflow concepts) (0) | 2020.08.02 |