빅데이터/airflow

[airflow] 에어플로우 컨셉(airflow concepts)

hs_seo 2020. 8. 2. 16:09

에어플로우 컨셉

에어플로우 공식 컨셉 문서를 통해 작업을 실행하고, 모니터링 하는 방법을 알아보겠습니다.

핵심 구상

DAG

  • 파이썬으로 정의한 작업의 모음
    • 작업 = Task = Operator
  • 동작의 실행순서, 동작 시간, 공통 파라미터 등을 정의
  • 에어플로우의 DAG_FOLDER에 파이썬 파일을 생성하면 에어플로우가 주기적으로 해당 폴더를 스캔하여 인식함

Scope

  • 에어플로우는 파이썬 파일에 선언된 DAG를 모두 로딩
    • DAG는 글로벌 영역으로 선언되어야 함
    • 자주 사용하는 패턴등을 SubDagOperator로 구현할 수도 있음

기본 파라미터

  • default_args는 모든 오퍼레이터에 적용 됨
    • 공통 파라미터를 모든 오퍼레이터에 전달
default_args = {
    'start_date': datetime(2016, 1, 1),
    'owner': 'airflow'
}

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

Context Manager

  • 에어플로우 1.8에 추가된 기능으로 오퍼레이터에 DAG를 자동으로 할당
  • with 문을 이용하여 좀 더 간편한 방식으로 사용 가능

기존 방식

# Context Manager 사용 전 방식 
dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)

Context Manager 방식

# Context Manager 사용 방식 
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    op = DummyOperator('op')

op.dag is dag # True

DAG Runs(실행)

  • DAG Runs는 DAG의 실행 인스턴스
    • DAG는 선언. DAG Runs는 실행 되어 인스턴스화된 상태
    • 스케줄러에 따라 실행되는 실행 시점인 execution_date를 포함하는 태스크 인스턴스를 가짐

execution_date

  • execution_date는 논리적인 시간
  • 물리적인 시간이 현재 2020.01.01 이라도 논리적인 시간은 2019.01.01 일 수 있음
    • 1년 전 작업을 재작업 하는 경우가 있을 수 있음
  • 실제 작업이 실행되는 시간임

태스크(Task)

  • 태스크는 작업의 단위를 구분한 것
    • DAG 그래프의 노드가 됨
    • BashOperator, PythonOperator 등 기본 구현체가 제공됨

Task간의 관계

  • 태스크 간에 선후 관계가 존재함
  • 아래의 경우 task_1이 실행후 task_2가 실행됨
    • task_1이 task_2의 upstream
    • task_2는 task_1의 downstream
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    task_1 = DummyOperator('task_1')
    task_2 = DummyOperator('task_2')
    task_1 >> task_2 # Define dependencies

Task Instance

  • Task가 실행된 상태
    • running, success, failed, skipped, up for retry 등의 상태를 가짐
    • DAG는 Task를 가지고
    • DAG run은 Task Instance를 가짐

Task Instance 간의 관계

  • 각 Task 인스턴스는 실행 시점은 execution_date를 가지고 해당 시간에 맞게 수행 됨

Task Lifecycle

  • 태스크는 여러 상태를 가짐
    task status

태스크 라이프 사이클

1. No status (scheduler created empty task instance)
      + 스케줄러가 비어있는 Task 인스턴스 생성 
2. Scheduled (scheduler determined task instance needs to run)
      + 스케줄러가 실행할 Task 인스턴스를 선언 
3. Queued (scheduler sent task to executor to run on the queue)
      + 스케줄러가 실행할 Task 인스턴스를 큐에 입력 
4. Running (worker picked up a task and is now running it)
      + 워커가 실행할 태스크를 큐에서 가져감 
5. Success (task completed)
      + 태스크가 종료됨 

task lifecycle

DAGs/task

+ 검은 보더로 표현되는 것은 스케줄링 된 작업
+ 일반 표시는 매뉴얼 트리거 작업 

border

Operator

  • DAG는 워크플로우가 동작하는 방법을 묘사
  • 오퍼레이터는 실제 작업을 지정
  • 오퍼레이터는 워크플로우에서 하나의 태스크
  • 오퍼레이터 간에 필요한 공유 정보는 XComs를 이용함
  • 기본 제공 오퍼레이터
    • BashOperator - executes a bash command
    • PythonOperator - calls an arbitrary Python function
    • EmailOperator - sends an email
    • SimpleHttpOperator - sends an HTTP request
    • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, + etc. - executes a SQL command
    • Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…
  • 기본 제공 특화 오퍼레이터
    • DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator, 등
  • List Airflow operators
  • How-to guides for some Airflow operators.

DAG Assignment

  • 오퍼레이터에 DAG는 다양한 방법으로 할당할 수 있음
  • dag 변수를 묵시적, 명시적으로 전달할 수 있음
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))

# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)

# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

Bitshift Composition

  • 오퍼레이터를 구성할 때 >>, << 로 구성 가능
# op1 동작후 op2 동작 
op1 >> op2
op1.set_downstream(op2)

op2 << op1
op2.set_upstream(op1)
op1 >> op2 >> op3 << op4

op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)
# 컨텍스트 매니저를 이용함 
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    (
        DummyOperator(task_id='dummy_1')
        >> BashOperator(
            task_id='bash_1',
            bash_command='echo "HELLO!"')
        >> PythonOperator(
            task_id='python_1',
            python_callable=lambda: print("GOODBYE!"))
    )

# 모두 동일함 
op1 >> [op2, op3] >> op4

op1 >> op2 >> op4
op1 >> op3 >> op4

op1.set_downstream([op2, op3])

Relationship Builders

  • chain, cross_downstream 함수를 이용해 동적으로 관계를 선언할 수 있음

cross_downstream 사용

[op1, op2, op3] >> op4
[op1, op2, op3] >> op5
[op1, op2, op3] >> op6

cross_downstream([op1, op2, op3], [op4, op5, op6])

chain 사용

chain(op1, op2, op3, op4, op5)

chain([DummyOperator(task_id='op' + i, dag=dag) for i in range(1, 6)])

Workflows

  • DAG
    • Python으로 작성된 작업 및 작업 순서
  • DAG Run
    • 특정 논리 날짜 및 시간에 대한 DAG 인스턴스
  • Operator
    • 일부 작업을 수행하기위한 템플릿 역할을하는 클래스
  • Task
    • Python으로 작성된 연산자를 구현하여 작업을 정의
  • Task Instance
    • 작업의 인스턴스
    • DAG에 할당되었고 특정 DAG 실행과 관련된 상태(예 : 특정 execution_date)
  • execution_date
    • DAG 실행 및 해당 작업 인스턴스의 논리적 날짜 및 시간

추가 기능

Hooks

  • Hook은 외부 플랫폼과 인터페이스
    • Hive, S3, MySQL, Postgres, HDFS, Pig 등
  • 외부 플랫폼과의 인터페이스를 구현하고, 오퍼레이터처럼 동작
  • airflow.models.connection.Connection를 이용함

List Airflow hooks

Pools

  • 태스크의 병렬 실행을 제한하기 위한 용도로 사용
  • Menu -> Admin -> Pools에서 설정
aggregate_db_message_job = BashOperator(
    task_id='aggregate_db_message_job',
    execution_timeout=timedelta(hours=3),
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd,
    dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

Connections

  • 외부 시스템에 접속할 때 사용하는 접속 정보를 저장
  • Hook에서 이 정보를 이용하여 처리
  • conn_id를 동일하게 저장하면 랜덤하게 선택하여 사용
    • 로드 밸런싱이 가능해짐
  • Menu -> Admin -> Connections에서 설정

Queues

  • CeleryExecutor를 이용할 때 큐(queue)를 지정해야 함
  • 이때 큐 정보를 이용함

Xcoms

  • 태스크 간에 정보를 교환할 때 이용함
  • xcom_pull, xcom_push를 이용하여 처리

파이썬 코드에서 이용하는 방법

def push_function():
    return value

# inside another PythonOperator where provide_context=True
def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='pushing_task')

진자 템플릿에서 이용하는 방법

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

Variables

  • 변수를 선언하고 가져올 수 있음
  • Admin -> Variables에서 설정
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)  # json 형식 처리 
baz = Variable.get("baz", default_var=None)

진자 템플릿에서 다음과 같이 이용 가능 함

echo {{ var.value.<variable_name> }}

echo {{ var.json.<variable_name> }}

환경변수 Variable

  • 환경 변수에 선언하고 가져올 수 있음
  • AIRFLOW_VAR_<변수명> 형태로 이용

선언

export AIRFLOW_VAR_FOO=BAR

# To use JSON, store them as JSON strings
export AIRFLOW_VAR_FOO_BAZ='{"hello":"world"}'

사용

from airflow.models import Variable
foo = Variable.get("foo")
foo_json = Variable.get("foo_baz", deserialize_json=True)

분기(Branching)

  • 분기를 처리할 때 BranchPythonOperator를 이용함
  • 참고

SubDAGs

  • SubDAGs는 동일한 패턴을 반복하는 작업의 선언에 유용함
  • 참고

SLAs

  • SLA 등록 가능
  • 사용하면 작업 오류가 발생한 것은 Browse->SLA Misses에서 확인 가능
  • emmail 전달 가능

Trigger Rules

  • 이전 태스크의 동작에 따라 다음 태스크의 동작을 지시할 수 있음
  • 기본은 all_success
    • all_success: (default) all parents have succeeded
    • all_failed: all parents are in a failed or upstream_failed state
    • all_done: all parents are done with their execution
    • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
    • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
    • none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
    • none_failed_or_skipped: all parents have not failed (failed or upstream_failed) and at least one parent has succeeded.
    • none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
    • dummy: dependencies are just for show, trigger at will

Latest Run Only

  • execution_time에 기반하여 이전 시간의 작업은 처리하지 않고 진항 하도록 설정
  • LatestOnlyOperator를 이용하여 작업 처리

Zombies & Undeads

  • 일부 정상적으로 동작하지 않는 작업도 존재함
  • 좀비는 하트비트에 응답이 없는 작업들
  • 언데드는 동작중이지만 에어플로우가 running 상태로 인식하지 못하는 상태

Cluster Policy

Airflow 태스크에 클러스터 전체 돌연변이를 적용하려는 경우 DAG가로드 된 직후 태스크를 변경하거나 태스크 실행 전에 태스크 인스턴스를 변경할 수 있습니다.

Documentation & Notes

  • 에어플로우의 웹 인터페이스 내용을 확인할 수 있게 문서화 가능
    • Graph View, Tree View, Task Details 에서 확인 가능
    • doc, doc_json, doc_yaml, doc_md, doc_rst 애트리뷰트로 추가 가능 함
dag = DAG('my_dag', default_args=default_args)
dag.doc_md = __doc__

t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""

Jinja Templating

  • Jinja 템플릿을 이용하여 파라미터를 전달 할 수 있음
  • 사용자가 정의한 변수를 전달하는 것도 가능함
  • 매크로를 이용하여 사전에 정의된 함수를 사용할 수도 있음
  • 아래는 ds 매크로가 동작하여 env의 맵에 실행 시간 포맷으로 변경되어 입력 됨
    # The execution date as YYYY-MM-DD
    date = "{{ ds }}"
    t = BashOperator(
      task_id='test_env',
      bash_command='/tmp/test.sh ',
      dag=dag,
      env={'EXECUTION_DATE': date})

참고

반응형