티스토리 뷰
에어플로우에서 DAG 간 데이터를 전달하는 방법으로 xcom을 이용할 수 있습니다.
context의 task_instance 객체를 이용해서 키-밸류 형태로 데이터를 전달할 수 있습니다.
다음은 데이터를 전달하여 branch를 처리하는 방법 예제입니다.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from airflow.models import DAG, Variable | |
from airflow.operators.python import PythonOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators.python_operator import BranchPythonOperator | |
args = {'owner': 'deploy', | |
'start_date': datetime.datetime(2022, 3, 28), | |
'retries': 0, | |
'catchup': False | |
} | |
def func_a(**context): | |
print("AAA") | |
task_instance = context['task_instance'] | |
task_instance.xcom_push(key='is_true', value=False) | |
def func_b(**context): | |
print("BBB") | |
def func_c(**context): | |
print("CCC") | |
def fuct_failure(params): | |
print("### FAILURE ###") | |
def branch_func(**context): | |
task_instance = context['task_instance'] | |
is_true = task_instance.xcom_pull(key='is_true') | |
if is_true: | |
return 't1_b' | |
else: | |
return 't1_c' | |
with DAG(dag_id='test_python', default_args=args, schedule_interval=None, concurrency=40, max_active_runs=16) as dag: | |
t1_create_cluster = PythonOperator(task_id='t1_a', python_callable=func_a, on_failure_callback=fuct_failure, dag=dag) | |
t2_create_cluster = PythonOperator(task_id='t1_b', python_callable=func_b, on_failure_callback=fuct_failure, dag=dag) | |
t3_create_cluster = PythonOperator(task_id='t1_c', python_callable=func_c, on_failure_callback=fuct_failure, dag=dag) | |
branch_op = BranchPythonOperator(task_id='branch_task', python_callable=branch_func, dag=dag) | |
t1_create_cluster >> branch_op | |
branch_op >> t2_create_cluster | |
branch_op >> t3_create_cluster |
반응형
'빅데이터 > airflow' 카테고리의 다른 글
[airflow 운영] 에어플로우 로그 정리 (1) | 2022.10.25 |
---|---|
[airflow] 에어플로우 설정 환경 변수로 설정 (0) | 2022.07.21 |
[airflow] DAG에서 사용하는 멀티 프로세스에서 발생하는 AssertionError: daemonic processes are not allowed to have children (0) | 2022.02.24 |
[airflow] 에어플로우 설치(celery executor + redis) (0) | 2021.12.20 |
[airflow] 에어플로우 DB 초기화 중 Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql 오류 (0) | 2021.11.30 |
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- hbase
- Linux
- yarn
- HIVE
- mysql
- 하이브
- emr
- 하둡
- AWS
- error
- 알고리즘
- Tez
- SQL
- 파이썬
- build
- 정올
- java
- 백준
- airflow
- Python
- HDFS
- 다이나믹
- Hadoop
- SPARK
- oozie
- nodejs
- 오류
- bash
- ubuntu
- S3
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
글 보관함