카테고리 없음

Apache Airflow 기초

sanook 2025. 2. 16. 22:38

Apache Airflow

Apache Airflow는 워크플로우를 자동화하고 관리하기 위한 오픈 소스 플랫폼입니다.

DAG(Directed Acyclic Graph)를 기반으로 작업(Task)을 정의하고 실행하며, 다양한 Operator를 활용해 여러 유형의 작업을 수행할 수 있습니다.

 

DAG (Directed Acyclic Graph)

DAG는 작업의 실행 순서를 정의하는 그래프로, 각 작업(Task)의 종속성을 표현합니다.

Airflow에서는 DAG를 통해 복잡한 워크플로우를 쉽게 정의할 수 있습니다.

 

주요 특징

  • 작업 간의 실행 순서를 정의합니다.
  • 반복 실행이 가능하며 스케줄링을 지원합니다.
  • DAG 내 모든 Task는 서로 연결되어야 합니다.

 

DAG 실행 (DAG Run)

DAG 실행(DAG Run)은 특정 시간에 DAG를 실행하는 개별적인 인스턴스를 의미합니다.

DAG는 설정된 schedule_interval에 따라 자동으로 실행 자동 실행되거나 사용자가 직접 UI나 CLI를 통해 수동으로 실행할 수 있습니다.

또한, API 등을 통해 외부에서 DAG 실행을 트리거할 수 있습니다.

 

상태

DAG 실행 상태는 state 속성을 통해 확인할 수 있으며, 주요 상태는 다음과 같습니다

  • running: DAG 실행이 진행 중인 상태
  • success: DAG 실행이 성공적으로 완료됨
  • failed: DAG 실행이 실패함
  • queued: DAG 실행이 대기열에 있음
  • skipped: DAG 실행이 건너뛰어짐
  • up_for_retry: DAG 실행이 재시도 상태임

DAG 실행 상태를 모니터링하려면 Airflow UI의 DAG Runs 탭을 활용하거나, CLI 명령어를 사용할 수 있습니다.

 

 

과거 데이터 처리

Backfill

Backfill은 특정 기간 동안 실행되지 않은 DAG Run을 수동으로 실행하는 기능입니다. CLI에서 다음과 같이 실행할 수 있습니다:

# 2024-01-01부터 2024-01-10까지 실행되지 않은 DAG를 채움
airflow dags backfill -s 2024-01-01 -e 2024-01-10 example_dag

위 명령어는 2024-01-01부터 2024-01-10까지의 실행되지 않은 DAG Run을 실행합니다.

 

Catchup

Catchup은 DAG의 catchup=True 설정을 통해 자동으로 실행되지 않은 과거 DAG 실행을 보충하는 기능입니다.

from airflow import DAG
from datetime import datetime

dag = DAG(
    dag_id='example_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=True  # 과거 실행을 자동으로 채움
)

catchup=False로 설정하면 최신 실행만 수행되며, 과거 실행이 무시됩니다.

dag = DAG(
    dag_id='example_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False  # 과거 실행을 무시하고 최신 실행만 수행
)

 

 

Trigger Rule

Task는 기본적으로 all_success 조건에서 실행되지만, trigger_rule을 사용하면 실행 조건을 변경할 수 있습니다.

  • all_success: 모든 부모 Task가 성공해야 실행됨 (기본값)
  • all_failed: 모든 부모 Task가 실패해야 실행됨
  • all_done: 부모 Task의 상태와 관계없이 실행됨
  • one_failed: 하나 이상의 부모 Task가 실패하면 실행됨
  • one_success: 하나 이상의 부모 Task가 성공하면 실행됨
  • none_failed: 부모 Task 중 하나라도 실패하지 않으면 실행됨
  • none_skipped: 부모 Task 중 하나라도 건너뛰어지지 않으면 실행됨
from airflow.operators.python import PythonOperator

def task_function():
    print("실행 중")

def task_final():
    print("Trigger Rule 조건 충족")

start = PythonOperator(task_id='start', python_callable=task_function)

task_1 = PythonOperator(task_id='task_1', python_callable=task_function)
task_2 = PythonOperator(task_id='task_2', python_callable=task_function)

task_3 = PythonOperator(task_id='task_3', python_callable=task_final, trigger_rule='one_success')

start >> [task_1, task_2] >> task_3

위 코드에서 task_3는 task_1 또는 task_2 중 하나라도 성공하면 실행됩니다.

 

 

Task (작업)

Task는 DAG 내에서 실행되는 개별 작업 단위입니다. Airflow에서는 다양한 Operator를 사용해 Task를 정의할 수 있습니다.

 

주요 특징

  • DAG 내에서 하나의 작업을 의미합니다.
  • Operator를 사용하여 실행할 작업을 정의합니다.
  • 다른 Task와 종속 관계를 설정할 수 있습니다.
  • @task 데코레이터를 사용하여 간결하게 Task를 정의할 수 있습니다.

 

@task 데코레이터 사용

Airflow 2.0부터는 @task 데코레이터를 사용하여 Python 함수를 간단하게 Task로 정의할 수 있습니다.

from airflow.decorators import task

@task
def print_message():
    print("Airflow Task 실행 중")

message_task = print_message()

 

Operator (연산자)

Operator는 특정 유형의 작업을 실행하는 기본 구성 요소입니다.

 

주요 Operator

PythonOperator

PythonOperator는 Python 함수를 실행하는 연산자로, Python 코드 실행이 필요할 때 사용됩니다.

from airflow.operators.python import PythonOperator

def print_hello():
    print("Hello, Airflow!")

python_task = PythonOperator(
    task_id='print_hello_task',
    python_callable=print_hello,
)

PythonOperator는 python_callable 매개변수를 사용해 실행할 함수를 지정합니다.

이 함수는 파라미터를 받을 수도 있으며, 실행 결과를 다른 Task에서 활용할 수도 있습니다.

 

 

BashOperator

Bash 명령어를 실행할 때 사용하는 연산자입니다.

from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id='print_date',
    bash_command='date'
)

 

Jinja 템플릿과 템플릿 기능

Airflow에서는 Jinja 템플릿을 사용하여 동적으로 값을 삽입할 수 있습니다. 이를 통해 실행 시간, 변수, XCom 데이터를 활용할 수 있습니다.

 

주요 기능

  • 실행 시간({{ ds }}) 등의 기본 변수 사용 가능
  • 사용자 정의 매크로 및 필터 사용 가능
  • XCom을 활용하여 Task 간 데이터 공유 가능
from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id='templated_bash',
    bash_command='echo "오늘 날짜는 {{ ds }} 입니다."'
)

위 코드에서 {{ ds }}는 실행 날짜를 의미하며, 실제 실행될 때 해당 날짜로 대체됩니다.

 

PythonOperator와 Jinja 템플릿

PythonOperator에서도 templates_dict를 활용하여 Jinja 템플릿을 사용할 수 있습니다.

from airflow.operators.python import PythonOperator

def print_date(**kwargs):
    execution_date = kwargs['templates_dict']['execution_date']
    print(f"실행 날짜: {execution_date}")

python_task = PythonOperator(
    task_id='templated_python',
    python_callable=print_date,
    templates_dict={'execution_date': '{{ ds }}'}
)

위 코드에서는 {{ ds }} 값이 execution_date에 전달되며, 실행 시 해당 날짜로 치환됩니다.

 

 

Task 분기 처리 방법

Airflow에서는 특정 조건에 따라 실행할 Task를 동적으로 결정하는 기능을 제공합니다.

대표적으로 BranchPythonOperator와 @task.branch 데코레이터를 사용할 수 있습니다.

 

1. BranchPythonOperator 활용

BranchPythonOperator는 특정 조건을 평가하고, 조건에 따라 실행할 Task를 결정하는 연산자입니다.

from airflow.operators.python import BranchPythonOperator, PythonOperator

def choose_branch():
    return 'task_a' if some_condition else 'task_b'

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch
)

task_a = PythonOperator(task_id='task_a', python_callable=lambda: print("Task A 실행"))
task_b = PythonOperator(task_id='task_b', python_callable=lambda: print("Task B 실행"))

branch_task >> [task_a, task_b]

위 코드에서 choose_branch() 함수가 실행된 후, task_a 또는 task_b 중 하나만 실행됩니다.

 

2. @task.branch 데코레이터 활용

Airflow 2.0부터는 @task.branch 데코레이터를 사용하여 더욱 간결하게 Task 분기를 처리할 수 있습니다.

from airflow.decorators import task

@task.branch
def choose_branch():
    return 'task_a' if some_condition else 'task_b'

@task
def task_a():
    print("Task A 실행")

@task
def task_b():
    print("Task B 실행")

branch_task = choose_branch()
task_a = task_a()
task_b = task_b()

branch_task >> [task_a, task_b]

위 방식은 BranchPythonOperator와 동일한 기능을 수행하지만, @task.branch 데코레이터를 사용해 더욱 깔끔하게 분기 처리를 할 수 있습니다.

 

XCom (Cross-Communication)

XCom은 Task 간 데이터를 공유하는 기능을 제공합니다.

XCom을 활용하면 한 Task에서 데이터를 저장하고, 다른 Task에서 해당 데이터를 가져와 사용할 수 있습니다.

 

XCom 데이터 저장 및 가져오기

방법 1: ti 객체 활용

아래 코드는 TaskInstance (ti) 객체를 이용하여 데이터를 저장(XCom push)하고, 다른 Task에서 해당 데이터를 가져오는(XCom pull) 방법을 보여줍니다.

from airflow.operators.python import PythonOperator

def push_data(**kwargs):
    ti = kwargs['ti']  # TaskInstance 객체 가져오기
    ti.xcom_push(key='message', value='Hello, XCom!')

def pull_data(**kwargs):
    ti = kwargs['ti']
    message = ti.xcom_pull(task_ids='push_task', key='message')
    print(f"Received message: {message}")

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_data,
    provide_context=True
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_data,
    provide_context=True
)

push_task >> pull_task

push_data 함수는 ti.xcom_push(key, value)를 사용하여 데이터를 저장하고

pull_data 함수는 ti.xcom_pull(task_ids, key)를 사용하여 데이터를 가져옵니다.

두 Task는 push_task >> pull_task로 연결되어, 먼저 push_task가 실행된 후 pull_task에서 데이터를 가져오도록 설정됩니다.

 

 

방법 2: 함수의 return 값 활용

Airflow 2.0부터는 @task 데코레이터를 사용하여 간결한 방식으로 XCom을 활용할 수 있습니다. 아래 코드는 Task 간 데이터를 전달하는 또 다른 방법을 보여줍니다.

from airflow.decorators import task

@task
def push_data():
    return "Hello, XCom!"

@task
def pull_data(message):
    print(f"Received message: {message}")

push_task = push_data()
pull_task = pull_data(push_task)

push_data 함수의 return 값이 자동으로 XCom에 저장되고 pull_data 함수는 push_data에서 반환된 값을 인자로 받아 데이터를 출력하여 push_task >> pull_task의 흐름으로 실행됩니다.

이 방식은 코드가 더욱 간결해지며, 별도로 xcom_push() 및 xcom_pull()을 호출할 필요 없이 데이터를 주고받을 수 있다는 장점이 있습니다.

 

 

출저 : https://airflow.apache.org/docs/apache-airflow/stable/index.html