안녕하세요 한소희입니다. 공부를 통해 배운 내용을 작성하고 있습니다. 혹여 해당 포스팅에서 잘못된 부분이 있을 경우, 알려주시면 빠르게 수정 조치하도록 하겠습니다.
01. Airflow XCOM이란?
XCOM 은, cross communcation 의 약자로 하나의 DAG 내 task간 데이터를 주고 받고 싶을 때 사용하는 Airflow 기능이다.
이는 적은 양의 데이터에 대해서만 주고받을 때 유용하며, 데이터 프레임과 같은 큰 값을 전달하는 데에 사용하는 것은 적합하지 않다는 특징이 있다.
02. Variable과 XCOM
Variable과 XCOM 은 공통점과 차이점이 존재한다.
Variable | XCOM | |
공통점 | key - value 형식 구성 | |
차이점 | 전역적 | DAG 내에서만 통신 |
key-value의 형식으로 구성되었다는 점이 Variable과 유사하다. 다만 Variable은 전역적이고, XCom은 DAG 내에서만 통신한다는 차이가 있다.
따라서, 전역변수로서 활용할 때는 Variable을 사용한다.
그렇지 않을 때는, XCOM을 이용한다. XCOM을 사용하면, 첫 번째 작업 실패 시 모든 재시도 작업에서 XCOM이 지워져 멱등성을 보장할 수 있다는 장점이 있다.
따라서, XCOM을 사용하면
1. 각기 다른 구성을 사용해 여러 작업을 실행할 때, 작업 간 구성 설정을 공유할 수 있다. DB에서 변화하는 구성설정 값을 조회 및 공유 가능하다.
2. 웹서비스 상태에 따라 다른 작업을 수행하는 워크플로 시, 웹서비스 상태를 조회하는 XCOM을 구성하여 분기 논리 제어가 가능하다.
3. 데이터 검색, 처리, DB 기록 워크플로우 시 독립적으로 각 task가 실행할 수 있으면서도 데이터를 공유할 수 있다.
이처럼, XCOM을 통해 워크플로우의 유연성을 높일 수 있다.
02. XCOM 사용방법
push-pull 을 이용해 XCOM을 사용하는 것이 일반적이다. (Jinja template에서 사용하기도 한다.)
- xcom_push(): 값을 저장
- xcom_pull(): 값을 검색
즉, push를 통해 값을 저장하고, pull을 통해 값을 끌어 와 사용한다.
이때, PythonOperator을 사용하는 경우 return과 push는 하나의 task에서 동일하게 인식된다.
나는 예시로 아래 링크에서 다룬 코드예제를 이용하여 테스트해보았다.
https://it-sunny-333.tistory.com/160
해당 예시는, 특정 aws에서 데이터를 다운받는 task & 해당 데이터를 split 하는 task로 구성돼 있다.
(전체 코드 구성은 위 출처 링크를 참고 부탁드리며, 아래 코드는 기능 별로 그룹화하였습니다.)
# 1. 데이터 입력 코드 (xcom_push)
우선 데이터를 입력하는 함수와, wrapper는 아래와 같다.
# 1. 데이터 입력 코드 (xcom_push)
def extract(**context):
url = context["params"]["url"]
logging.info(url)
f = requests.get(url)
return f.text
exec_extract = PythonOperator(
task_id="exec_extract",
python_callable=extract,
params={"url": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"},
dag=dag,
)
해당 코드를 통해 만들어진 XCOM값을, Airflow Web UI를 통해 확인할 수 있다.
# 2. 데이터 추출 코드 (xcom_pull)
xcom_pull을 이용해, 저장한 데이터를 당겨 와 가공한다.
사실상 데이터를 추출하는 코드기도 하면서, PythonOperator - return 를 사용하였기에, 이 또한 xcom_push를 한 것이기도 하다.
즉, exec_extract task 를 pull 하여, exec_transform에 push 하는 코드다.
# 2. 데이터 추출 코드 (xcom_pull)
def transform(**context):
text = context["task_instance"].xcom_pull(task_ids="exec_extract")
lines = text.split("\n")
return lines
exec_transform = PythonOperator(
task_id="exec_transform", python_callable=transform, dag=dag
)
이로 인해, exec_transform 로 최종 저장된 XCOM 또한 Web UI에서 확인 가능하다.
위 extract 코드에서 추출한 값을 잘 가공한 것을 알 수 있다.
이처럼, 상태에 따른 워크플로우 구성 시 혹은 task 간 데이터 전달 시 xcom을 활용한다면, 더욱 유연한 워크플로우 구성을 할 수 있겠다는 생각이 들었다.
공식 문서 링크는 아래와 같다.
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html
'데이터 공부 > Apache Airflow' 카테고리의 다른 글
Airflow를 helm으로 배포해보기 - (2) (0) | 2023.04.08 |
---|---|
Airflow를 helm으로 배포해보기 - (1) 사이드카 패턴 (0) | 2023.03.12 |
Airflow Helm 배포 - 설치하기 (0) | 2022.10.22 |
Airflow Helm 배포 - Helm 과 Chart 를 알아보기 (0) | 2022.10.22 |
Docker airflow2 커스텀 이미지 빌드 과정 (0) | 2022.04.02 |