빅데이터/airflow
[airflow] 에어플로우 컨셉(airflow concepts)
hs_seo
2020. 8. 2. 16:09
에어플로우 컨셉
에어플로우 공식 컨셉 문서를 통해 작업을 실행하고, 모니터링 하는 방법을 알아보겠습니다.
핵심 구상
DAG
- 파이썬으로 정의한 작업의 모음
- 작업 = Task = Operator
- 동작의 실행순서, 동작 시간, 공통 파라미터 등을 정의
- 에어플로우의
DAG_FOLDER
에 파이썬 파일을 생성하면 에어플로우가 주기적으로 해당 폴더를 스캔하여 인식함
Scope
- 에어플로우는 파이썬 파일에 선언된 DAG를 모두 로딩
- DAG는 글로벌 영역으로 선언되어야 함
- 자주 사용하는 패턴등을
SubDagOperator
로 구현할 수도 있음
기본 파라미터
default_args
는 모든 오퍼레이터에 적용 됨- 공통 파라미터를 모든 오퍼레이터에 전달
default_args = {
'start_date': datetime(2016, 1, 1),
'owner': 'airflow'
}
dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow
Context Manager
- 에어플로우 1.8에 추가된 기능으로 오퍼레이터에 DAG를 자동으로 할당
with
문을 이용하여 좀 더 간편한 방식으로 사용 가능
기존 방식
# Context Manager 사용 전 방식
dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
Context Manager 방식
# Context Manager 사용 방식
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
op = DummyOperator('op')
op.dag is dag # True
DAG Runs(실행)
- DAG Runs는 DAG의 실행 인스턴스
- DAG는 선언. DAG Runs는 실행 되어 인스턴스화된 상태
- 스케줄러에 따라 실행되는 실행 시점인
execution_date
를 포함하는 태스크 인스턴스를 가짐
execution_date
execution_date
는 논리적인 시간- 물리적인 시간이 현재 2020.01.01 이라도 논리적인 시간은 2019.01.01 일 수 있음
- 1년 전 작업을 재작업 하는 경우가 있을 수 있음
- 실제 작업이 실행되는 시간임
태스크(Task)
- 태스크는 작업의 단위를 구분한 것
- DAG 그래프의 노드가 됨
- BashOperator, PythonOperator 등 기본 구현체가 제공됨
Task간의 관계
- 태스크 간에 선후 관계가 존재함
- 아래의 경우 task_1이 실행후 task_2가 실행됨
- task_1이 task_2의 upstream
- task_2는 task_1의 downstream
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
task_1 = DummyOperator('task_1')
task_2 = DummyOperator('task_2')
task_1 >> task_2 # Define dependencies
Task Instance
- Task가 실행된 상태
- running, success, failed, skipped, up for retry 등의 상태를 가짐
- DAG는 Task를 가지고
- DAG run은 Task Instance를 가짐
Task Instance 간의 관계
- 각 Task 인스턴스는 실행 시점은
execution_date
를 가지고 해당 시간에 맞게 수행 됨
Task Lifecycle
- 태스크는 여러 상태를 가짐
태스크 라이프 사이클
1. No status (scheduler created empty task instance)
+ 스케줄러가 비어있는 Task 인스턴스 생성
2. Scheduled (scheduler determined task instance needs to run)
+ 스케줄러가 실행할 Task 인스턴스를 선언
3. Queued (scheduler sent task to executor to run on the queue)
+ 스케줄러가 실행할 Task 인스턴스를 큐에 입력
4. Running (worker picked up a task and is now running it)
+ 워커가 실행할 태스크를 큐에서 가져감
5. Success (task completed)
+ 태스크가 종료됨
DAGs/task
+ 검은 보더로 표현되는 것은 스케줄링 된 작업
+ 일반 표시는 매뉴얼 트리거 작업
Operator
- DAG는 워크플로우가 동작하는 방법을 묘사
- 오퍼레이터는 실제 작업을 지정
- 오퍼레이터는 워크플로우에서 하나의 태스크
- 오퍼레이터 간에 필요한 공유 정보는
XComs
를 이용함 - 기본 제공 오퍼레이터
- BashOperator - executes a bash command
- PythonOperator - calls an arbitrary Python function
- EmailOperator - sends an email
- SimpleHttpOperator - sends an HTTP request
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, + etc. - executes a SQL command
- Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…
- 기본 제공 특화 오퍼레이터
- DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator, 등
- List Airflow operators
- How-to guides for some Airflow operators.
DAG Assignment
- 오퍼레이터에 DAG는 다양한 방법으로 할당할 수 있음
dag
변수를 묵시적, 명시적으로 전달할 수 있음
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)
Bitshift Composition
- 오퍼레이터를 구성할 때
>>
,<<
로 구성 가능
# op1 동작후 op2 동작
op1 >> op2
op1.set_downstream(op2)
op2 << op1
op2.set_upstream(op1)
op1 >> op2 >> op3 << op4
op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)
# 컨텍스트 매니저를 이용함
with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
(
DummyOperator(task_id='dummy_1')
>> BashOperator(
task_id='bash_1',
bash_command='echo "HELLO!"')
>> PythonOperator(
task_id='python_1',
python_callable=lambda: print("GOODBYE!"))
)
# 모두 동일함
op1 >> [op2, op3] >> op4
op1 >> op2 >> op4
op1 >> op3 >> op4
op1.set_downstream([op2, op3])
Relationship Builders
chain
,cross_downstream
함수를 이용해 동적으로 관계를 선언할 수 있음
cross_downstream
사용
[op1, op2, op3] >> op4
[op1, op2, op3] >> op5
[op1, op2, op3] >> op6
cross_downstream([op1, op2, op3], [op4, op5, op6])
chain
사용
chain(op1, op2, op3, op4, op5)
chain([DummyOperator(task_id='op' + i, dag=dag) for i in range(1, 6)])
Workflows
- DAG
- Python으로 작성된 작업 및 작업 순서
- DAG Run
- 특정 논리 날짜 및 시간에 대한 DAG 인스턴스
- Operator
- 일부 작업을 수행하기위한 템플릿 역할을하는 클래스
- Task
- Python으로 작성된 연산자를 구현하여 작업을 정의
- Task Instance
- 작업의 인스턴스
- DAG에 할당되었고 특정 DAG 실행과 관련된 상태(예 : 특정 execution_date)
- execution_date
- DAG 실행 및 해당 작업 인스턴스의 논리적 날짜 및 시간
추가 기능
Hooks
- Hook은 외부 플랫폼과 인터페이스
- Hive, S3, MySQL, Postgres, HDFS, Pig 등
- 외부 플랫폼과의 인터페이스를 구현하고, 오퍼레이터처럼 동작
airflow.models.connection.Connection
를 이용함
Pools
- 태스크의 병렬 실행을 제한하기 위한 용도로 사용
Menu -> Admin -> Pools
에서 설정
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
Connections
- 외부 시스템에 접속할 때 사용하는 접속 정보를 저장
- Hook에서 이 정보를 이용하여 처리
conn_id
를 동일하게 저장하면 랜덤하게 선택하여 사용- 로드 밸런싱이 가능해짐
Menu -> Admin -> Connections
에서 설정
Queues
CeleryExecutor
를 이용할 때 큐(queue)를 지정해야 함- 이때 큐 정보를 이용함
Xcoms
- 태스크 간에 정보를 교환할 때 이용함
xcom_pull
,xcom_push
를 이용하여 처리
파이썬 코드에서 이용하는 방법
def push_function():
return value
# inside another PythonOperator where provide_context=True
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='pushing_task')
진자 템플릿에서 이용하는 방법
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
Variables
- 변수를 선언하고 가져올 수 있음
Admin -> Variables
에서 설정
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True) # json 형식 처리
baz = Variable.get("baz", default_var=None)
진자 템플릿에서 다음과 같이 이용 가능 함
echo {{ var.value.<variable_name> }}
echo {{ var.json.<variable_name> }}
환경변수 Variable
- 환경 변수에 선언하고 가져올 수 있음
AIRFLOW_VAR_<변수명>
형태로 이용
선언
export AIRFLOW_VAR_FOO=BAR
# To use JSON, store them as JSON strings
export AIRFLOW_VAR_FOO_BAZ='{"hello":"world"}'
사용
from airflow.models import Variable
foo = Variable.get("foo")
foo_json = Variable.get("foo_baz", deserialize_json=True)
분기(Branching)
- 분기를 처리할 때
BranchPythonOperator
를 이용함 - 참고
SubDAGs
- SubDAGs는 동일한 패턴을 반복하는 작업의 선언에 유용함
- 참고
SLAs
- SLA 등록 가능
- 사용하면 작업 오류가 발생한 것은
Browse->SLA Misses
에서 확인 가능 - emmail 전달 가능
Trigger Rules
- 이전 태스크의 동작에 따라 다음 태스크의 동작을 지시할 수 있음
- 기본은
all_success
- all_success: (default) all parents have succeeded
- all_failed: all parents are in a failed or upstream_failed state
- all_done: all parents are done with their execution
- one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
- one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
- none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
- none_failed_or_skipped: all parents have not failed (failed or upstream_failed) and at least one parent has succeeded.
- none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
- dummy: dependencies are just for show, trigger at will
Latest Run Only
execution_time
에 기반하여 이전 시간의 작업은 처리하지 않고 진항 하도록 설정LatestOnlyOperator
를 이용하여 작업 처리
Zombies & Undeads
- 일부 정상적으로 동작하지 않는 작업도 존재함
- 좀비는 하트비트에 응답이 없는 작업들
- 언데드는 동작중이지만 에어플로우가
running
상태로 인식하지 못하는 상태
Cluster Policy
Airflow 태스크에 클러스터 전체 돌연변이를 적용하려는 경우 DAG가로드 된 직후 태스크를 변경하거나 태스크 실행 전에 태스크 인스턴스를 변경할 수 있습니다.
Documentation & Notes
- 에어플로우의 웹 인터페이스 내용을 확인할 수 있게 문서화 가능
- Graph View, Tree View, Task Details 에서 확인 가능
- doc, doc_json, doc_yaml, doc_md, doc_rst 애트리뷰트로 추가 가능 함
dag = DAG('my_dag', default_args=default_args)
dag.doc_md = __doc__
t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""
Jinja Templating
- Jinja 템플릿을 이용하여 파라미터를 전달 할 수 있음
- 사용자가 정의한 변수를 전달하는 것도 가능함
- 매크로를 이용하여 사전에 정의된 함수를 사용할 수도 있음
- 아래는
ds
매크로가 동작하여 env의 맵에 실행 시간 포맷으로 변경되어 입력 됨# The execution date as YYYY-MM-DD date = "{{ ds }}" t = BashOperator( task_id='test_env', bash_command='/tmp/test.sh ', dag=dag, env={'EXECUTION_DATE': date})
참고
반응형