빅데이터/airflow15 [airflow] 에어플로우 로그를 정리할 때 사용할 수 있는 명령어 에어플로우가 오랜 기간 실행되면 에어플로우의 로그와 DAG의 실행 로그가 로컬에 쌓이게 됩니다. 이 로그를 주기적으로 정리해 주어야 할 때 사용할 수 있는 명령어 입니다. 다음은 로그 파일의 사이즈를 0으로 초기화 하는 명령어 입니다. truncate -s 0 airflow-worker.errcat /dev/null > airflow-worker.err: > airflow-worker.err> airflow-worker.err DAG 로그를 초기화 하기 위해서는 find 명령어를 이용합니다. 폴더로 파일이 생성되기 때문에 rm 명령어를 이용하고, DAG이 생성되는 날짜를 이용하여 처리합니다. # airflow/logs 폴더에서 사용find ./ -name "*2024-10-11* -maxdepth 2.. 2024. 10. 11. [airflow] apache-airflow-providers-apache-hive 설치 중 오류 처리 에어플로우에서 hive hook을 사용하기 위해서 apache-airflow-providers-apache-hive를 설치 하는 중 라이브러리 관련 오류가 발생하여 필요한 라이브러리를 설치 후 오류를 해결할 수 있었습니다. # sasl 오류 sasl/saslwrapper.h:22:10: fatal error: sasl/sasl.h: No such file or directory # gcc 오류 gcc: error trying to exec 'cc1plus': execvp: Not a directory 이런 오류가 발생할 때 다음의 라이브러리를 먼저 설치후 빌드 하면 됩니다. # 모든 apt 설치. # gcc, g++, libsasl2-dev 3개를 설치 하면 오류를 처리할 수 있음 RUN apt-get .. 2023. 8. 13. [airflow] PythonOperator의 op_args, op_kwargs 변수 전달 파이썬 오퍼레이터에 변수를 넘기는 방법을 알아보겠습니다. 파이썬 오퍼레이터(PythonOperator)는 `op_args`, `op_kwargs` 두 가지 변수를 받을 수 있습니다. op_args operator를 생성할 때 op_args 변수로 전달 op_kwargs 오퍼레이터에 전달 되는 모든 변수를 전달 op_kwargs 의 키에 맞는 변수를 선언하면 자동으로 전달 params 는 에어플로우 DAG 을 실행할 때 전달한 파라미터 (Trigger DAG w/ config 이용) 2023. 7. 2. [airflow 운영] 에어플로우 로그 정리 에어플로우 DAG 이 실행되면 로컬에 log가 쌓이게 됩니다. 분/시간 단위로 로그가 쌓이게 되면 시간이 지나면 문제가 발생하게 됩니다. 에어플로우를 운영할 때는 로컬에 쌓이게 된는 DAG 로그를 정리하는 것도 중요합니다. 따라서 주기적으로 에어플로우 로그를 삭제할 수 있는 스크립트를 크론잡에 설정하거나, 주기적으로 실행하여야 합니다. 2022. 10. 25. [airflow] 에어플로우 설정 환경 변수로 설정 에어플로우는 airflow.cfg 에 설정을 할 수도 있지만 실행전에 export 로 환경변수로 설정하면 해당 정보를 실행 시점에 읽어서 변수에 반영합니다. 주의할 점은 버전에 따라 문서의 환경변수가 적용되지 않을 수도 있습니다. 저는 2.2.5 버전의 에어플로우를 사용하는데 문서에는 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN 로 되어 있지만, AIRFLOW__CORE__SQL_ALCHEMY_CONN 으로 설정해야 설정이 적용되었습니다. 이는 airflow.cfg 설정파일을 확인해 보면 [core] 아래 sql_alchemy_conn 이 있는 것을 확인할 수 있습니다. 에어플로우는 설정 파일에 적용된 순서에 따라 파싱을 진행하기 때문에 airflow.cfg 파일을 확인하면 설정명을 .. 2022. 7. 21. [airflow] Xcom을 이용하여 DAG 간 데이터 전달 에어플로우에서 DAG 간 데이터를 전달하는 방법으로 xcom을 이용할 수 있습니다. context의 task_instance 객체를 이용해서 키-밸류 형태로 데이터를 전달할 수 있습니다. 다음은 데이터를 전달하여 branch를 처리하는 방법 예제입니다. 2022. 4. 3. [airflow] DAG에서 사용하는 멀티 프로세스에서 발생하는 AssertionError: daemonic processes are not allowed to have children 에어플로우 DAG에서 멀티 프로세스를 이용하는 경우 파이썬 프로세스에서 또 프로세스를 생성할 수 없다는 오류가 발생합니다. File "/usr/local/lib/python3.8/multiprocessing/process.py", line 118, in start assert not _current_process._config.get('daemon'), \ AssertionError: daemonic processes are not allowed to have children 해결방법 이 경우 멀티프로세스를 스레드 풀로 변경하면 됩니다. 다음과 같이 사용할 수 있습니다. 2022. 2. 24. [airflow] 에어플로우 설치(celery executor + redis) 에어플로우는 웹서버, 스케줄러, 익스큐터, 메세지 큐로 구성할 수 있습니다. 웹서버, 스케줄러는 에어플로우를 설치하면 되고, 익스큐터는 로컬 익스큐터, 셀러리 익스큐터는 에어플로우를 설치하면 구성되고, 쿠버네티스 익스큐터는 따로 설정해야 합니다. 그리고 여러개의 익스큐터와 스케줄러의 통신을 위해서 메시지 큐를 설치할 수 있습니다. 또한 익스큐터간 데이터 저장을 위한 DB 도 필요합니다. 따라서 에어플로우 하나를 사용하기 위해서는 다음의 5개 컴포넌트가 필요합니다. (시퀀셜 익스큐터를 이용하는 경우 sqlite를 이용해서 웹서버, 스케줄러, 익스큐터 만 있으면 됩니다.) 여기서는 셀러리 익스큐터를 이용하는 방법을 알아보겠습니다. 웹서버 스케줄러 익스큐터 시퀀셜 익스큐터 셀러리 익스큐터 메세지큐 레디스 Ra.. 2021. 12. 20. [airflow] 에어플로우 DB 초기화 중 Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql 오류 에어플로우 설치중 mysql 5.x 버전을 이용할 때 다음과 같은 오류가 발생하는 경우가 있습니다. File "/home/deploy/.local/lib/python3.6/site-packages/alembic/runtime/migration.py", line 560, in run_migrations step.migration_fn(**kw) File "/home/deploy/.local/lib/python3.6/site-packages/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py", line 44, in upgrade raise Exception("Global variable explicit_defaults_for_timest.. 2021. 11. 30. [airflow] PythonOperator에서 TypeError: function() got an unexpected keyword argument 'conf' 오류 에어플로우에서 파이썬 오퍼레이터를 이용할 때 이런 오류가 발생하는 경우가 있습니다. [2021-11-25 09:49:49,343] {taskinstance.py:1150} ERROR - func() got an unexpected keyword argument 'conf' Traceback (most recent call last): File "/home/deploy/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task result = task_copy.execute(context=context) File "/home/deploy/.local/lib/python3.6/site-package.. 2021. 11. 25. [airflow] BashOperator에서 jinja2.exceptions.TemplateNotFound: 오류 에어플로우 BashOperator 에서 템플릿을 사용하지 않는데, TelplateNotFound 오류가 발생하는 경우가 있습니다. 이 경우 스크립트 뒤에 공백을 추가하면 됩니다. [2021-11-17 18:14:21,099] {taskinstance.py:1150} ERROR - /home/user/sample.sh Traceback (most recent call last): File "/home/deploy/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 965, in _run_raw_task self.render_templates(context=context) File "/home/deploy/.local/lib/py.. 2021. 11. 18. [airflow] 에어플로우 설치 간단하게 에어플로우를 설치하고, 시퀀셜 익스큐터(SequentialExecutor)와 sqlite를 이용하여 테스트용으로 사용할 수 있는 에어플로우를 설치 하는 방법을 알아보겠습니다. * 이렇게 에어플로우를 설치하면 실행할 수 있지만, 이 에어플로우는 시퀀설 익스큐터를 이용하기 때문에 한번에 하나의 작업만 가능합니다. 2021. 10. 14. [airflow] 에어플로우 RestApi 설정 에어플로우 기본 설정은 REST API 로 접속하면 거부하게 설정이 되어 있습니다. airflow.cfg 설정의 REST API 접근 오류 # 명령어로 현재 설정 상태 확인 $ airflow config get-value api auth_backend airflow.api.auth.backend.deny_all # airflow.cfg의 설정 확인 auth_backend = airflow.api.auth.backend.deny_all # rest api 호출시 오류 $ curl http://0.0.0.0:8080/api/v1/dags/example/dagRuns { "detail": null, "status": 401, "title": "Unauthorized", "type": "https://airfl.. 2021. 10. 6. [airflow] 에어플로우 컨셉(airflow concepts) 에어플로우 컨셉 에어플로우 공식 컨셉 문서를 통해 작업을 실행하고, 모니터링 하는 방법을 알아보겠습니다. 핵심 구상 DAG 파이썬으로 정의한 작업의 모음 작업 = Task = Operator 동작의 실행순서, 동작 시간, 공통 파라미터 등을 정의 에어플로우의 DAG_FOLDER에 파이썬 파일을 생성하면 에어플로우가 주기적으로 해당 폴더를 스캔하여 인식함 Scope 에어플로우는 파이썬 파일에 선언된 DAG를 모두 로딩 DAG는 글로벌 영역으로 선언되어야 함 자주 사용하는 패턴등을 SubDagOperator로 구현할 수도 있음 기본 파라미터 default_args는 모든 오퍼레이터에 적용 됨 공통 파라미터를 모든 오퍼레이터에 전달 default_args = { 'start_date': date.. 2020. 8. 2. [airflow] 워크플로우 모니터링 플랫폼 - apache airflow Airflow 란? 에어비앤비에서 개발한 워크플로우 스케줄링, 모니터링 플랫폼 빅데이터는 수집, 정제, 적제, 분석 과정을 거치면서 여러가지 단계를 거치게 되는데 이 작업들을 관리하기 위한 도구 2019.09 현재 1.10.5 버전이 최신이며 아파치의 탑레벨 프로젝트로 등록^1 특징 Dynamic 에어플로우 파이프라인(동작순서, 방식)을 파이썬 코드를 이용하여 구성하기 때문에 동적인 구성이 가능 Extensible 파이썬을 이용하여 오퍼레이터, 익스큐터를 사용자의 환경에 맞게 확장하여 구성하는 것이 가능함 Elegant 에어플로우 파이프라인은 간결하고 명시적이며, 진자 템플릿(jinja template)을 이용하여 파라미터화 된 데이터를 전달하고 자동으로 파이프라인을 생성하는 것이 가능 Scalable .. 2019. 9. 9. 이전 1 다음