빅데이터/airflow

[airflow] 워크플로우 모니터링 플랫폼 - apache airflow

hs_seo 2019. 9. 9. 22:57

Airflow 란?

  • 에어비앤비에서 개발한 워크플로우 스케줄링, 모니터링 플랫폼
    • 빅데이터는 수집, 정제, 적제, 분석 과정을 거치면서 여러가지 단계를 거치게 되는데 이 작업들을 관리하기 위한 도구
  • 2019.09 현재 1.10.5 버전이 최신이며 아파치의 탑레벨 프로젝트로 등록^1

ariflow

특징

  • Dynamic
    • 에어플로우 파이프라인(동작순서, 방식)을 파이썬 코드를 이용하여 구성하기 때문에 동적인 구성이 가능
  • Extensible
    • 파이썬을 이용하여 오퍼레이터, 익스큐터를 사용자의 환경에 맞게 확장하여 구성하는 것이 가능함
  • Elegant
    • 에어플로우 파이프라인은 간결하고 명시적이며, 진자 템플릿(jinja template)을 이용하여 파라미터화 된 데이터를 전달하고 자동으로 파이프라인을 생성하는 것이 가능
  • Scalable
    • 분산구조와 메시지큐를 이용하여 많은 수의 워커간의 협업을 지원하고, 스케일 아웃이 가능함

에어플로우 파이프라인이 파이썬을 이용하여 구성되었기 때문에 간결하고 다이나믹하게 변경할 수 있고, 분산 구조를 가지고 있어서 확장성이 높음

Ariflow 설치

에어플로우는 파이썬으로 구현되어 있기 때문에 기본적으로 파이썬과 pip가 설치되어 있어야 합니다. 설치 과정^2은 다음과 같습니다.

# 에어플로우 HOME 위치 설정 
# 지정하지 않았을 때 아래의 기본 위치에 설치 
export AIRFLOW_HOME=~/airflow  # 기본 설정 위치 

# apache-airflow를 설치
pip install apache-airflow

# 데이터베이스 초기화 
# airflow.cfg에 데이터베이스 설정을 하지 않으면 sqlite를 이용
airflow initdb

에어플로우를 설치하고, 데이터베이스를 초기화 하는 것까지 진행하면 에어플로우를 실행할 수 있습니다.

설정 정보

에어플로우 관련 설정은 airflow.cfg파일을 수정하여 진행할 수 있습니다. 실제 운영 서버에서 작업을 위해서는 Celery Executor^3Dask Executor^4를 이용해야 합니다. 위와 같이 설치하는 방법은 테스트 목적으로만 이용하는 것이 좋습니다.

Airflow 실행

에어플로우 설치후 웹서버(webserver)와 스케줄러(scheduler)를 실행해야 합니다.

# 8080 포트로 웹서버 실행, 포트 변경 가능 
airflow webserver -p 8080

# 스케줄러 실행 
airflow scheduler

# 웹서버 실행후 다음으로 접속 
http://localhost:8080 

웹서버 실행후 지정한 포트로 접속하면 DAG 뷰를 확인할 수 있습니다. DAG의 스케줄러를 On으로 변경하면 스케줄러가 실행됩니다.

dag

에어플로우는 기본적인 예제가 포함되어 있습니다. 이를 없애기 위해서는 airflow.cfg 파일의 load_examples설정을 False로 변경해야 합니다.

Airflow 예제

에어플로우 DAG는 python으로 작성하여, AIRFLOW_HOME 아래 dags 폴더에 넣으면 자동으로 인식합니다. 다음은 배쉬 오퍼레이터 t1에 t2, t3 오퍼레이터가 병렬로 동작하는 예제입니다.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

# 하루에 한번 실행되는 DAG
dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# BASH 오퍼레이터 
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

작성한 파일을 아래와 같이 dags 폴더에 생성하면 됩니다.

$ ll
total 228
-rw-r--r-- 1 bpsec users  30365 Sep 23 07:33 airflow.cfg
-rw-r--r-- 1 bpsec users 177152 Sep 24 04:46 airflow.db
-rw-r--r-- 1 bpsec users      6 Sep 23 08:34 airflow-webserver.pid
drwxr-xr-x 3 bpsec users   4096 Sep 23 07:45 dags
drwxr-xr-x 8 bpsec users   4096 Sep 23 07:46 logs
-rw-r--r-- 1 bpsec users   2462 Sep 10 06:52 unittests.cfg

$ ls -al dags/
total 20
drwxr-xr-x 3 bpsec users 4096 Sep 23 07:45 .
drwxr-xr-x 4 bpsec users 4096 Sep 24 04:46 ..
-rw-r--r-- 1 bpsec users 1219 Sep 23 07:45 bash_sample.py
-rw-r--r-- 1 bpsec users  809 Sep 19 10:39 hive_sample.py
drwxr-xr-x 2 bpsec users 4096 Sep 23 07:45 __pycache__

커넥션 설정

예제로 제공하는 BashOperator외에도 mysql, hive, pig 등 여러가지 오퍼레이터^5를 제공합니다. 이 오퍼레이터를 이용하기 위해서는 연결정보인 커넥션을 설정해야 합니다.

에어플로우의 장점

  • 강력한 웹 UI
    • 트리, 그래프, 변수, 간트 차트등 다양한 뷰 제공
    • DAG의 실행, 로그, 변수 등 여러가지 정보 확인
  • 다양한 커넥션 제공
    • hive, pig, mysql, mssql 등
  • 활발한 커뮤니티
    • GitHub 홈페이지^6에서 빠르게 수정 및 업데이트가 이루어지고 있음

참고

반응형