티스토리 뷰

공부

[airflow] backfill dag

승가비 2024. 4. 4. 00:44
728x90
import subprocess

from airflow.operators.python import PythonOperator
from batch.core.const import DUMMY, REPLAY, ___
from batch.core.default_airflow_dag import DefaultAirflowDAG, DagHistory
from batch.util.date_util import today, add_date, FORMAT_YYYY_MM_DD

PARAMS = {
    "dag_id": None,
}


def run(command):
    try:
        result = subprocess.run(
            command,
            check=True,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        print("Output:", result.stdout.decode())
    except subprocess.CalledProcessError as e:
        print("Error Message:", e.stderr.decode())


def airflow_dag_replay(
    dag_id,
    owner,
    start_date,
    description,
    schedule_interval,
    default_args,
    tags=[],
    args_f=DUMMY,
):
    def _replay(**kwargs):
        args = args_f()
        p = kwargs["params"]

        target = args["dag_id"].split(___)[-2] or p["dag_id"].split(___)[-2]
        ago = int(args["dag_id"].split(___)[-1] or p["dag_id"].split(___)[-1])

        end = today(format=FORMAT_YYYY_MM_DD)
        start = add_date(end, -ago, FORMAT_YYYY_MM_DD)

        run(f"airflow dags clear -s {start} -e {end} {target}")
        run(f"airflow dags backfill -s {start} -e {end} {target}")

    dag = DefaultAirflowDAG(
        dag_id,
        description=description,
        schedule_interval=schedule_interval,
        default_args=default_args,
        tags=tags + [REPLAY],
        history=[
            DagHistory(
                start_date,
                owner,
                "Init",
            )
        ],
        params=PARAMS
    )

    PythonOperator(
        task_id="replay",
        python_callable=_replay,
        provide_context=True,
        dag=dag
    )

    return dag

https://velog.io/@ujeongoh/Airflow%EC%97%90%EC%84%9C-Backfill-%EC%8B%A4%ED%96%89%ED%95%B4%EB%B3%B4%EA%B8%B0

 

Airflow에서 Backfill 실행해보기

Airflow에서 Backfill 은 이전에 실행되지 않은 작업을 재실행하는 프로세스이다. 만약 DAG가 매일 자정에 실행되도록 스케줄되어있고, 특정 기간 동안 실행되지 않았을 경우, Backfill 을 사용하여 그

velog.io

 

728x90

'공부' 카테고리의 다른 글

[JUnit] Parameterized  (0) 2024.04.04
[sql] update & join  (0) 2024.04.04
[github] peter-evans/create-pull-request  (0) 2024.04.03
[spring] server.shutdown=graceful  (0) 2024.04.03
[Thymeleaf] CSS add class  (0) 2024.04.03
댓글