데이터 공부/Apache Airflow

Airflow XCOM 알아보기

한소희DE 2023. 7. 16. 22:48

안녕하세요 한소희입니다. 공부를 통해 배운 내용을 작성하고 있습니다. 혹여 해당 포스팅에서 잘못된 부분이 있을 경우, 알려주시면 빠르게 수정 조치하도록 하겠습니다.

 

 

 

01. Airflow XCOM이란? 

XCOM 은, cross communcation 의 약자로 하나의 DAG 내 task간 데이터를 주고 받고 싶을 때 사용하는 Airflow 기능이다.

이는 적은 양의 데이터에 대해서만 주고받을 때 유용하며, 데이터 프레임과 같은 큰 값을 전달하는 데에 사용하는 것은 적합하지 않다는 특징이 있다.

 

 

 

02. Variable과 XCOM

Variable과 XCOM 은 공통점과 차이점이 존재한다.

  Variable XCOM
공통점 key - value 형식 구성
차이점 전역적 DAG 내에서만 통신

 

key-value의 형식으로 구성되었다는 점이 Variable과 유사하다. 다만 Variable은 전역적이고, XCom은 DAG 내에서만 통신한다는 차이가 있다.

 

따라서, 전역변수로서 활용할 때는 Variable을 사용한다.

그렇지 않을 때는, XCOM을 이용한다. XCOM을 사용하면, 첫 번째 작업 실패 시 모든 재시도 작업에서 XCOM이 지워져 멱등성을 보장할 수 있다는 장점이 있다.

 

따라서, XCOM을 사용하면

 

1. 각기 다른 구성을 사용해 여러 작업을 실행할 때, 작업 간 구성 설정을 공유할 수 있다. DB에서 변화하는 구성설정 값을 조회 및 공유 가능하다.

2. 웹서비스 상태에 따라 다른 작업을 수행하는 워크플로 시, 웹서비스 상태를 조회하는 XCOM을 구성하여 분기 논리 제어가 가능하다.

3. 데이터 검색, 처리, DB 기록 워크플로우 시 독립적으로 각 task가 실행할 수 있으면서도 데이터를 공유할 수 있다.

 

이처럼, XCOM을 통해 워크플로우의 유연성을 높일 수 있다.

 


 

 

 

02. XCOM 사용방법

 

push-pull 을 이용해 XCOM을 사용하는 것이 일반적이다. (Jinja template에서 사용하기도 한다.)

 

  • xcom_push():  값을 저장
  • xcom_pull(): 값을 검색

 

즉, push를 통해 값을 저장하고, pull을 통해 값을 끌어 와 사용한다.

이때, PythonOperator을 사용하는 경우 return과 push는 하나의 task에서 동일하게 인식된다. 

 

나는 예시로 아래 링크에서 다룬 코드예제를 이용하여 테스트해보았다.

https://it-sunny-333.tistory.com/160

 

 

해당 예시는, 특정 aws에서 데이터를 다운받는 task & 해당 데이터를 split 하는 task로 구성돼 있다.

(전체 코드 구성은 위 출처 링크를 참고 부탁드리며, 아래 코드는 기능 별로 그룹화하였습니다.)

 

 

 

# 1. 데이터 입력 코드 (xcom_push)

 

우선 데이터를 입력하는 함수와, wrapper는 아래와 같다. 

# 1. 데이터 입력 코드 (xcom_push)

def extract(**context):
    url = context["params"]["url"]
    logging.info(url)
    f = requests.get(url)
    return f.text
    
    
exec_extract = PythonOperator(
    task_id="exec_extract",
    python_callable=extract,
    params={"url": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"},
    dag=dag,
)

 

해당 코드를 통해 만들어진 XCOM값을, Airflow Web UI를 통해 확인할 수 있다.

 

 

 

 

 

# 2. 데이터 추출 코드 (xcom_pull)

xcom_pull을 이용해, 저장한 데이터를 당겨 와 가공한다.

사실상 데이터를 추출하는 코드기도 하면서, PythonOperator - return 를 사용하였기에, 이 또한 xcom_push를 한 것이기도 하다.

즉, exec_extract task 를 pull 하여, exec_transform에 push 하는 코드다.

# 2. 데이터 추출 코드 (xcom_pull)

def transform(**context):
    text = context["task_instance"].xcom_pull(task_ids="exec_extract")
    lines = text.split("\n")
    return lines
    
    
 
 exec_transform = PythonOperator(
    task_id="exec_transform", python_callable=transform, dag=dag
)

 

이로 인해, exec_transform 로 최종 저장된 XCOM 또한 Web UI에서 확인 가능하다.

extract 코드에서 추출한 값을 잘 가공한 것을 알 수 있다.

 

 

 

 

 

 
 

 

 

이처럼, 상태에 따른 워크플로우 구성 시 혹은 task 간 데이터 전달 시 xcom을 활용한다면, 더욱 유연한 워크플로우 구성을 할 수 있겠다는 생각이 들었다.

 

 

공식 문서 링크는 아래와 같다.

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html

 

XComs — Airflow Documentation

 

airflow.apache.org