데이터 공부/Apache Airflow

DAG 파일 생성 및 실행 + GCP(구글클라우드플랫폼) - Airflow 연동

한소희DE 2021. 6. 29. 19:24

 

 

 

목차

DAG 파일 생성 후 Airflow 실행

GCP(구글 클라우드 플랫폼) - Airflow  연동

 


 

 

 

01. DAG 파일 생성 후 Airflow 실행

 

우선적으로, docker를 연다. 이미 ※ 첫 번째 포스팅에서 컨테이너를 생성했으므로, 컨테이너를 새로 시작할 땐 시작(start)만 해주면 된다.

 

※ 첫 번째 포스팅 링크

 

docker start airflow_v1

 

Vim과 Procps를 설치해보자.

처음 받은 이미지에는, vi로 파일을 수정할 수 없다. 왜냐하면 위와 같은 파일이 설치가 되어있지 않기 때문이다. 또한 netstate로 네트워크 내역을 확인할 수 있도록 필요한 소프트웨어를 설치해준다. root로 접속해서 설치한다.

docker exec -u root -ti airflow_v1 /bin/bash 
apt-get update 
apt-get install -y vim 
apt-get install -y procps

# 출처: 도시형닌자의 정보 동굴

 

그 후, 환경설정 파일(cfg)과 데이터베이스 등 전반적인 요소를 초기화해줘야 한다. 초기화 코드는 아래와 같다.

airflow initdb

 

 

이제, airflow 내부에 dag 폴더를 생성해주자. 이 dag 폴더 안에 우리가 원하는 dag 파일을 넣을 것이다. 그리고 폴더 안에는, 첫 번째 포스팅 때 다뤘던 요리하기 프로세스 파일을 저장해서 넣어준다.

# airflow 폴더 안에 dag 폴더 생성
mkdir dag

# dag 폴더 안에 cooking.py 파일 생성
vim cooking.py

 

파일 내부에 첫 번째 포스팅 때 작성했던  소스코드를 넣은 뒤 저장한다.

그 후, 실행을 시켜본다. (python cooking.py) 정상적으로 잘 돌아가면 성공인 것이다.

 

이제 새로 만든 dag가 리스트에 존재하는지 확인해본다. (+만약 tasks를 검색하고 싶으면 airflow list_tasks!)

airflow list_dags

 

 

위처럼, cookflow가 정상적으로 뜬다면 성공이다!

이제 https://localhost:8080에 접속해 잘 뜬다면 연결이 된 것이다.

 

 

 

... 오... 처음 해보는 거라 왕신기합니다.... 하하

 

 

 

 cookflow를 클릭하면, 이처럼 프로세스가 보이는 것을 알 수 있다.

이제 간단한 airflow 접근 방법을 알았으니, 우리의 목적인 BigQuery 접근을 시도해보겠다.

 

 

 

https://cloud.google.com/composer/docs/how-to/managing/connections#airflow-1

 

 


 

 

02. GCP(구글 클라우드 플랫폼) - Airflow  연동

 

2-1. IAM 권한 확인

본인이 해당 프로젝트에 권한이 제대로 있는지 우선적으로 확인해본다. 또한 Airflow를 사용할 유저도 권한이 존재한 지 확인해보고 없다면 추가를 해준다.

 

 

 

2-2. 서비스 계정의 비공개 키를 생성

 

IAM 및 관리자 - 서비스 계정 페이지를 접속한 뒤, 서비스 계정에 대한 비공개 키를 생성한다.

그 후 json 파일을 저장해둔다.

 

 

위의 사진처럼, 서비스 계정 페이지에서 서비스 계정 만들기를 클릭한다.

 

 

그 후, 생성된 서비스 계정을 클릭해 키 추가 버튼을 눌러준 뒤, json 형태로 생성해 저장해두면 된다.

 

 

 

 

2-3. Airflow Connection 

Airflow에 접속해 Admin - Connection 버튼을 눌러, 새로운 연결을 지정해준다.

 

 

그 후, Create 버튼을 눌러 계정을 생성해주면 된다.

 

 

 

Keyfile JSON 에는, 아까 저장한 Key 파일 콘텐츠를 복사해 붙여 넣어주면 된다.

그 후, SAVE를 하면 연결이 된 것이다!

 

 

이제 dag 파일을 생성할 때, 아래처럼 만들어주면 연결이 된다고 한다.

아래 코드는 GCP 공식문서를 스크랩했다.

 

※ GCP 공식문서

https://cloud.google.com/composer/docs/how-to/managing/connections#airflow-2_2

 

Airflow 연결 관리  |  Cloud Composer  |  Google Cloud

이 페이지에서는 Airflow 연결을 사용하는 방법을 설명합니다. Airflow 연결을 사용하면 Cloud Composer 환경에서 Google Cloud 프로젝트의 리소스에 액세스 할 수 있습니다. 로그인 및 호스트 이름과 같은

cloud.google.com

 

task_custom = bigquery.BigQueryInsertJobOperator(
    task_id='task_custom_connection',
    gcp_conn_id='my_gcp_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)