데이터 공부/Apache Airflow

Apache Airflow 처음부터 빌드해보기 + 4

한소희DE 2021. 9. 24. 09:05

현재까지 진행한 내용을 정리하자면, 우리는 VM 안에 새로운 가상환경을 만들어 airflow를 설치했다.

앞으로 우리는 DAG 파일을 만들거고, 가장 먼저 sqlite 에서 데이터를 출력해와 테이블을 생성할 것이다. 이 작업을 함께 진행해보자.

 

01. DAG 생성

간단한 DAG를 생성해보겠다. Sqlite에 새로운 Table을 주기적으로 생성하는 task을 만들어 볼 것이다!

 

앞선 포스팅에서 생성한 dags 폴더 안에, dag명을 딴 py 파일을 생성한 뒤, 위와 같이 작성해준다.

설명은 아래와 같다.

 

  • from airflow.models import DAG - DAG import
  • from airflow.providers.sqlite.operators.sqlite import SqliteOperator - Sqlite 접근 및 table 생성할 수 있도록 돕는 operator
  • start_date - 파라미터. 스케줄이 시작되는 날짜. 실행날짜가 아니라는 것에 유념하기
  • DAG(DAG파일명, schedule_interval - 실행 주기, default_args - 파라미터 적용, catchup - start_date가 과거일 경우 start_date 때 부터 지금까지 스케줄을 실행할지에 대한 여부)
  • task_id - 해당 task 의 id
  • sqlite_conn_id - db연결을 위해 사용할 id, 추후 airflow ui에서 생성할 connection_id와 동일하게 작성
  • sql - 원하는 sql문을 ''' ''' 안에 작성

 

02. DAG 실행

 

1. 가상환경 접속

source sandbox/bin/activate

2. 웹서버 run

airflow webserver

 

3. 스케줄러 run

airflow scheduler

 

4. chrome 통해 localhost:8080 접속

5. 생성된 user_processing dag 파일 활성화 (아래 그림에서 파란 가로열림 버튼 클릭)

 

💡 만약 추후 모듈을 또 설치하고 싶을 땐 어딜 참조하면 좋을까?
localhost:8080 접속해 home/provider packages reference 접속하여 매개변수 찾아 설치하면 된다.

 

 

위 사진을 보면, 스케줄에 맞춰 task가 실행됨을 알 수 있다. (나는 앞선 테스트에서 table이 생성됐는데 또 생성하라는 명령을 주어 fail이 떴고, 이는 sql 문의 if exist 문을 활용해 쉽게 해결할 수 있었다.)

 


 

03. Airflow Connection

이제 우리는 sqlite db에 접속하기 위해서, airflow connection을 설정해주어야 한다.

이는, connect 할 db에 접근 권한을 허용해주기 위해 설정한다.

 

1. localhost:8080 접속, connection - 새로 생성

 

2. sqlite_conn_id 에 맞게 conn_id 설정 & 기타 설명 작성

 

※ 주소입력 방법 : airflow - ls 명령 후, pwd 명령 통해 나온 주소(/home/airlfow/airflow)에 /airflow.db 붙여 host 입력

 

 

 

이렇게 connection까지 설정해주었으면, test를 통해서 잘 실행되는지 검증해본다. 문법은 아래와 같다.

airflow tasks test dags명 task명 날짜(이때 날짜는 execution_date)

 

 

결과적으로 잘 실행하는 것을 알 수 있다. 

 

.

.

.

 

[궁금증] test는 실제 실행 결과에 영향을 주지 않는가? - 찾아봐야 할 것!