DAG 파일 생성 및 실행 + GCP(구글클라우드플랫폼) - Airflow 연동
목차
DAG 파일 생성 후 Airflow 실행
GCP(구글 클라우드 플랫폼) - Airflow 연동
01. DAG 파일 생성 후 Airflow 실행
우선적으로, docker를 연다. 이미 ※ 첫 번째 포스팅에서 컨테이너를 생성했으므로, 컨테이너를 새로 시작할 땐 시작(start)만 해주면 된다.
※ 첫 번째 포스팅 링크
docker start airflow_v1
Vim과 Procps를 설치해보자.
처음 받은 이미지에는, vi로 파일을 수정할 수 없다. 왜냐하면 위와 같은 파일이 설치가 되어있지 않기 때문이다. 또한 netstate로 네트워크 내역을 확인할 수 있도록 필요한 소프트웨어를 설치해준다. root로 접속해서 설치한다.
docker exec -u root -ti airflow_v1 /bin/bash
apt-get update
apt-get install -y vim
apt-get install -y procps
# 출처: 도시형닌자의 정보 동굴
그 후, 환경설정 파일(cfg)과 데이터베이스 등 전반적인 요소를 초기화해줘야 한다. 초기화 코드는 아래와 같다.
airflow initdb
이제, airflow 내부에 dag 폴더를 생성해주자. 이 dag 폴더 안에 우리가 원하는 dag 파일을 넣을 것이다. 그리고 폴더 안에는, 첫 번째 포스팅 때 다뤘던 요리하기 프로세스 파일을 저장해서 넣어준다.
# airflow 폴더 안에 dag 폴더 생성
mkdir dag
# dag 폴더 안에 cooking.py 파일 생성
vim cooking.py
파일 내부에 첫 번째 포스팅 때 작성했던 소스코드를 넣은 뒤 저장한다.
그 후, 실행을 시켜본다. (python cooking.py) 정상적으로 잘 돌아가면 성공인 것이다.
이제 새로 만든 dag가 리스트에 존재하는지 확인해본다. (+만약 tasks를 검색하고 싶으면 airflow list_tasks!)
airflow list_dags
위처럼, cookflow가 정상적으로 뜬다면 성공이다!
이제 https://localhost:8080에 접속해 잘 뜬다면 연결이 된 것이다.
... 오... 처음 해보는 거라 왕신기합니다.... 하하
cookflow를 클릭하면, 이처럼 프로세스가 보이는 것을 알 수 있다.
이제 간단한 airflow 접근 방법을 알았으니, 우리의 목적인 BigQuery 접근을 시도해보겠다.
https://cloud.google.com/composer/docs/how-to/managing/connections#airflow-1
02. GCP(구글 클라우드 플랫폼) - Airflow 연동
2-1. IAM 권한 확인
본인이 해당 프로젝트에 권한이 제대로 있는지 우선적으로 확인해본다. 또한 Airflow를 사용할 유저도 권한이 존재한 지 확인해보고 없다면 추가를 해준다.
2-2. 서비스 계정의 비공개 키를 생성
IAM 및 관리자 - 서비스 계정 페이지를 접속한 뒤, 서비스 계정에 대한 비공개 키를 생성한다.
그 후 json 파일을 저장해둔다.
위의 사진처럼, 서비스 계정 페이지에서 서비스 계정 만들기를 클릭한다.
그 후, 생성된 서비스 계정을 클릭해 키 추가 버튼을 눌러준 뒤, json 형태로 생성해 저장해두면 된다.
2-3. Airflow Connection
Airflow에 접속해 Admin - Connection 버튼을 눌러, 새로운 연결을 지정해준다.
그 후, Create 버튼을 눌러 계정을 생성해주면 된다.
Keyfile JSON 에는, 아까 저장한 Key 파일 콘텐츠를 복사해 붙여 넣어주면 된다.
그 후, SAVE를 하면 연결이 된 것이다!
이제 dag 파일을 생성할 때, 아래처럼 만들어주면 연결이 된다고 한다.
아래 코드는 GCP 공식문서를 스크랩했다.
※ GCP 공식문서
https://cloud.google.com/composer/docs/how-to/managing/connections#airflow-2_2
task_custom = bigquery.BigQueryInsertJobOperator(
task_id='task_custom_connection',
gcp_conn_id='my_gcp_connection',
configuration={
"query": {
"query": 'SELECT 1',
"useLegacySql": False
}
}
)