Apache Airflow 처음부터 빌드해보기 + 4
현재까지 진행한 내용을 정리하자면, 우리는 VM 안에 새로운 가상환경을 만들어 airflow를 설치했다.
앞으로 우리는 DAG 파일을 만들거고, 가장 먼저 sqlite 에서 데이터를 출력해와 테이블을 생성할 것이다. 이 작업을 함께 진행해보자.
01. DAG 생성
간단한 DAG를 생성해보겠다. Sqlite에 새로운 Table을 주기적으로 생성하는 task을 만들어 볼 것이다!
앞선 포스팅에서 생성한 dags 폴더 안에, dag명을 딴 py 파일을 생성한 뒤, 위와 같이 작성해준다.
설명은 아래와 같다.
- from airflow.models import DAG - DAG import
- from airflow.providers.sqlite.operators.sqlite import SqliteOperator - Sqlite 접근 및 table 생성할 수 있도록 돕는 operator
- start_date - 파라미터. 스케줄이 시작되는 날짜. 실행날짜가 아니라는 것에 유념하기
- DAG(DAG파일명, schedule_interval - 실행 주기, default_args - 파라미터 적용, catchup - start_date가 과거일 경우 start_date 때 부터 지금까지 스케줄을 실행할지에 대한 여부)
- task_id - 해당 task 의 id
- sqlite_conn_id - db연결을 위해 사용할 id, 추후 airflow ui에서 생성할 connection_id와 동일하게 작성
- sql - 원하는 sql문을 ''' ''' 안에 작성
02. DAG 실행
1. 가상환경 접속
source sandbox/bin/activate
2. 웹서버 run
airflow webserver
3. 스케줄러 run
airflow scheduler
4. chrome 통해 localhost:8080 접속
5. 생성된 user_processing dag 파일 활성화 (아래 그림에서 파란 가로열림 버튼 클릭)
💡 만약 추후 모듈을 또 설치하고 싶을 땐 어딜 참조하면 좋을까?
localhost:8080 접속해 home/provider packages reference 접속하여 매개변수 찾아 설치하면 된다.
위 사진을 보면, 스케줄에 맞춰 task가 실행됨을 알 수 있다. (나는 앞선 테스트에서 table이 생성됐는데 또 생성하라는 명령을 주어 fail이 떴고, 이는 sql 문의 if exist 문을 활용해 쉽게 해결할 수 있었다.)
03. Airflow Connection
이제 우리는 sqlite db에 접속하기 위해서, airflow connection을 설정해주어야 한다.
이는, connect 할 db에 접근 권한을 허용해주기 위해 설정한다.
1. localhost:8080 접속, connection - 새로 생성
2. sqlite_conn_id 에 맞게 conn_id 설정 & 기타 설명 작성
※ 주소입력 방법 : airflow - ls 명령 후, pwd 명령 통해 나온 주소(/home/airlfow/airflow)에 /airflow.db 붙여 host 입력
이렇게 connection까지 설정해주었으면, test를 통해서 잘 실행되는지 검증해본다. 문법은 아래와 같다.
airflow tasks test dags명 task명 날짜(이때 날짜는 execution_date)
결과적으로 잘 실행하는 것을 알 수 있다.
.
.
.
[궁금증] test는 실제 실행 결과에 영향을 주지 않는가? - 찾아봐야 할 것!