공부
[airflow] backfill dag
승가비
2024. 4. 4. 00:44
728x90
import subprocess
from airflow.operators.python import PythonOperator
from batch.core.const import DUMMY, REPLAY, ___
from batch.core.default_airflow_dag import DefaultAirflowDAG, DagHistory
from batch.util.date_util import today, add_date, FORMAT_YYYY_MM_DD
PARAMS = {
"dag_id": None,
}
def run(command):
try:
result = subprocess.run(
command,
check=True,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
print("Output:", result.stdout.decode())
except subprocess.CalledProcessError as e:
print("Error Message:", e.stderr.decode())
def airflow_dag_replay(
dag_id,
owner,
start_date,
description,
schedule_interval,
default_args,
tags=[],
args_f=DUMMY,
):
def _replay(**kwargs):
args = args_f()
p = kwargs["params"]
target = args["dag_id"].split(___)[-2] or p["dag_id"].split(___)[-2]
ago = int(args["dag_id"].split(___)[-1] or p["dag_id"].split(___)[-1])
end = today(format=FORMAT_YYYY_MM_DD)
start = add_date(end, -ago, FORMAT_YYYY_MM_DD)
run(f"airflow dags clear -s {start} -e {end} {target}")
run(f"airflow dags backfill -s {start} -e {end} {target}")
dag = DefaultAirflowDAG(
dag_id,
description=description,
schedule_interval=schedule_interval,
default_args=default_args,
tags=tags + [REPLAY],
history=[
DagHistory(
start_date,
owner,
"Init",
)
],
params=PARAMS
)
PythonOperator(
task_id="replay",
python_callable=_replay,
provide_context=True,
dag=dag
)
return dag
Airflow에서 Backfill 실행해보기
Airflow에서 Backfill 은 이전에 실행되지 않은 작업을 재실행하는 프로세스이다. 만약 DAG가 매일 자정에 실행되도록 스케줄되어있고, 특정 기간 동안 실행되지 않았을 경우, Backfill 을 사용하여 그
velog.io
728x90